finished simple test for TaskTracker(MapReduce Mode) and change serialize framwork to fst because of fastjson's indian

This commit is contained in:
tjq 2020-03-25 21:24:47 +08:00
parent ab642805c4
commit d44f818b81
10 changed files with 131 additions and 17 deletions

View File

@ -20,8 +20,8 @@
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<guava.version>28.2-jre</guava.version>
<fastjson.version>1.2.58</fastjson.version>
<junit.version>5.6.1</junit.version>
<fst.version>2.56</fst.version>
</properties>
<dependencies>
@ -68,11 +68,11 @@
<version>${guava.version}</version>
</dependency>
<!-- fastJSON -->
<!-- FST 超超超高性能序列化框架 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>${fst.version}</version>
</dependency>
<!-- Junit 测试 -->

View File

@ -76,6 +76,8 @@ public class TaskTrackerActor extends AbstractActor {
subTask.setTaskName(req.getTaskName());
subTask.setTaskId(originSubTask.getTaskId());
subTask.setTaskContent(originSubTask.getTaskContent());
subTaskList.add(subTask);
});
success = taskTracker.addTask(subTaskList);

View File

@ -0,0 +1,35 @@
package com.github.kfcfans.oms.worker.common.utils;
import org.nustaq.serialization.FSTConfiguration;
import java.nio.charset.StandardCharsets;
/**
* 序列化框架
*
* @author tjq
* @since 2020/3/25
*/
public class SerializerUtils {
private static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration();
public static byte[] serialize(Object obj) {
return conf.asByteArray(obj);
}
public static Object deSerialized(byte[] bytes) {
return conf.asObject(bytes);
}
public static String toJSON(Object object) {
if (object == null) {
return null;
}
try {
return new String(serialize(object), StandardCharsets.UTF_8);
}catch (Exception ignore) {
}
return null;
}
}

View File

@ -1,12 +1,12 @@
package com.github.kfcfans.oms.worker.core.executor;
import akka.actor.ActorSelection;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
@ -54,7 +54,7 @@ public class ProcessorRunnable implements Runnable {
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
taskContext.setSubTask(SerializerUtils.deSerialized(request.getSubTaskContent()));
}
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
@ -104,6 +104,8 @@ public class ProcessorRunnable implements Runnable {
ProcessResult lastResult;
Map<String, String> taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId);
// 去除本任务
taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID);
try {
switch (executeType) {

View File

@ -57,6 +57,11 @@ public class SimpleTaskQuery {
}
String substring = sb.substring(0, sb.length() - LINK.length());
if (!StringUtils.isEmpty(otherCondition)) {
substring += otherCondition;
}
if (limit != null) {
substring = substring + " limit " + limit;
}

View File

@ -1,7 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
@ -44,7 +43,7 @@ public class ProcessorMapTaskRequest implements Serializable {
// 不同执行线程之间前缀taskId不同该ID可以保证分布式唯一
String subTaskId = taskContext.getTaskId() + "." + i;
// 写入类名方便反序列化
byte[] content = JSON.toJSONBytes(subTaskList.get(i), SerializerFeature.WriteClassName);
byte[] content = SerializerUtils.serialize(subTaskList.get(i));
subTasks.add(new SubTask(subTaskId, content));
}

View File

@ -48,6 +48,7 @@ public class TaskTrackerStartTaskReq implements Serializable {
public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) {
jobId = instanceInfo.getJobId();
instanceId = instanceInfo.getInstanceId();
processorType = instanceInfo.getProcessorType();
@ -60,6 +61,7 @@ public class TaskTrackerStartTaskReq implements Serializable {
jobParams = instanceInfo.getJobParams();
instanceParams = instanceInfo.getInstanceParams();
taskId = task.getTaskId();
taskName = task.getTaskName();
subTaskContent = task.getTaskContent();

View File

@ -41,23 +41,46 @@ public class TaskTrackerTest {
@Test
public void testStandaloneJob() throws Exception {
remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null);
Thread.sleep(500000);
}
@Test
public void testMapReduceJob() throws Exception {
remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null);
Thread.sleep(500000);
}
private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId("1");
req.setInstanceId("10086");
req.setAllWorkerAddress(NetUtils.getLocalHost());
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setJobParams("this is job Params");
req.setInstanceParams("this is instance Params");
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
req.setTaskRetryNum(3);
req.setThreadConcurrency(5);
req.setTimeLimit(500000);
remoteTaskTracker.tell(req, null);
switch (executeType) {
case STANDALONE:
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
break;
case MAP_REDUCE:
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
break;
case BROADCAST:
req.setExecuteType(ExecuteType.BROADCAST.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
break;
}
Thread.sleep(500000);
return req;
}

View File

@ -0,0 +1,37 @@
package com.github.kfcfans.oms.processors;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import java.util.Map;
/**
* 测试用的广播执行处理器
*
* @author tjq
* @since 2020/3/25
*/
public class TestBroadcastProcessor implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext taskContext) throws Exception {
System.out.println("=============== TestBroadcastProcessor#preProcess ===============");
System.out.println("taskContext:" + taskContext);
return new ProcessResult(true, "preProcess success");
}
@Override
public ProcessResult postProcess(TaskContext taskContext, Map<String, String> taskId2Result) throws Exception {
System.out.println("=============== TestBroadcastProcessor#postProcess ===============");
System.out.println("taskContext:" + taskContext);
System.out.println("taskId2Result:" + taskId2Result);
return new ProcessResult(true, "postProcess success");
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("=============== TestBroadcastProcessor#process ===============");
System.out.println("taskContext:" + context);
return new ProcessResult(true, "processSuccess");
}
}

View File

@ -8,6 +8,7 @@ import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@ -22,7 +23,9 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
@Override
public ProcessResult reduce(TaskContext taskContext, Map<String, String> taskId2Result) {
System.out.println("============== TestMapReduceProcessor#reduce ==============");
return new ProcessResult(true, "reduce success");
System.out.println("taskContext:" + taskContext);
System.out.println("taskId2Result:" + taskId2Result);
return new ProcessResult(true, "REDUCE_SUCCESS");
}
@Override
@ -39,19 +42,25 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
}
ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK");
System.out.println("map result = " + mapResult);
return new ProcessResult(true, "MAP_SUCCESS");
}else {
System.out.println("start to process");
System.out.println(context.getSubTask());
return new ProcessResult(true, "PROCESS_SUCCESS");
}
return new ProcessResult(true, "PROCESS_SUCCESS");
}
@ToString
@NoArgsConstructor
@AllArgsConstructor
private static class TestSubTask {
private static class TestSubTask implements Serializable {
private String name;
private int age;
}
@Override
public void init() throws Exception {
System.out.println("============== TestMapReduceProcessor#init ==============");
}
}