diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml
index 4345c17a..eea81ec8 100644
--- a/oh-my-scheduler-worker/pom.xml
+++ b/oh-my-scheduler-worker/pom.xml
@@ -20,8 +20,8 @@
1.4.200
3.4.2
28.2-jre
- 1.2.58
5.6.1
+ 2.56
@@ -68,11 +68,11 @@
${guava.version}
-
+
- com.alibaba
- fastjson
- ${fastjson.version}
+ de.ruedigermoeller
+ fst
+ ${fst.version}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java
index 76490786..8ab946af 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java
@@ -76,6 +76,8 @@ public class TaskTrackerActor extends AbstractActor {
subTask.setTaskName(req.getTaskName());
subTask.setTaskId(originSubTask.getTaskId());
subTask.setTaskContent(originSubTask.getTaskContent());
+
+ subTaskList.add(subTask);
});
success = taskTracker.addTask(subTaskList);
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java
new file mode 100644
index 00000000..f324bd01
--- /dev/null
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java
@@ -0,0 +1,35 @@
+package com.github.kfcfans.oms.worker.common.utils;
+
+import org.nustaq.serialization.FSTConfiguration;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 序列化框架
+ *
+ * @author tjq
+ * @since 2020/3/25
+ */
+public class SerializerUtils {
+
+ private static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration();
+
+ public static byte[] serialize(Object obj) {
+ return conf.asByteArray(obj);
+ }
+
+ public static Object deSerialized(byte[] bytes) {
+ return conf.asObject(bytes);
+ }
+
+ public static String toJSON(Object object) {
+ if (object == null) {
+ return null;
+ }
+ try {
+ return new String(serialize(object), StandardCharsets.UTF_8);
+ }catch (Exception ignore) {
+ }
+ return null;
+ }
+}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
index 93ffa47c..9ca891db 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
@@ -1,12 +1,12 @@
package com.github.kfcfans.oms.worker.core.executor;
import akka.actor.ActorSelection;
-import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
+import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
@@ -54,7 +54,7 @@ public class ProcessorRunnable implements Runnable {
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
- taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
+ taskContext.setSubTask(SerializerUtils.deSerialized(request.getSubTaskContent()));
}
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
@@ -104,6 +104,8 @@ public class ProcessorRunnable implements Runnable {
ProcessResult lastResult;
Map taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId);
+ // 去除本任务
+ taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID);
try {
switch (executeType) {
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java
index 351a88f0..89afb841 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java
@@ -57,6 +57,11 @@ public class SimpleTaskQuery {
}
String substring = sb.substring(0, sb.length() - LINK.length());
+
+ if (!StringUtils.isEmpty(otherCondition)) {
+ substring += otherCondition;
+ }
+
if (limit != null) {
substring = substring + " limit " + limit;
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java
index d3c684bb..0bba1e5d 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java
@@ -1,7 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
@@ -44,7 +43,7 @@ public class ProcessorMapTaskRequest implements Serializable {
// 不同执行线程之间,前缀(taskId)不同,该ID可以保证分布式唯一
String subTaskId = taskContext.getTaskId() + "." + i;
// 写入类名,方便反序列化
- byte[] content = JSON.toJSONBytes(subTaskList.get(i), SerializerFeature.WriteClassName);
+ byte[] content = SerializerUtils.serialize(subTaskList.get(i));
subTasks.add(new SubTask(subTaskId, content));
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java
index 5377494a..99711eb4 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java
@@ -48,6 +48,7 @@ public class TaskTrackerStartTaskReq implements Serializable {
public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) {
+
jobId = instanceInfo.getJobId();
instanceId = instanceInfo.getInstanceId();
processorType = instanceInfo.getProcessorType();
@@ -60,6 +61,7 @@ public class TaskTrackerStartTaskReq implements Serializable {
jobParams = instanceInfo.getJobParams();
instanceParams = instanceInfo.getInstanceParams();
+ taskId = task.getTaskId();
taskName = task.getTaskName();
subTaskContent = task.getTaskContent();
diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java
index c8f71242..2faf433c 100644
--- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java
+++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java
@@ -41,23 +41,46 @@ public class TaskTrackerTest {
@Test
public void testStandaloneJob() throws Exception {
+ remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null);
+ Thread.sleep(500000);
+ }
+
+ @Test
+ public void testMapReduceJob() throws Exception {
+ remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null);
+ Thread.sleep(500000);
+ }
+
+ private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId("1");
req.setInstanceId("10086");
req.setAllWorkerAddress(NetUtils.getLocalHost());
- req.setExecuteType(ExecuteType.STANDALONE.name());
+
req.setJobParams("this is job Params");
req.setInstanceParams("this is instance Params");
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
- req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
req.setTaskRetryNum(3);
req.setThreadConcurrency(5);
req.setTimeLimit(500000);
- remoteTaskTracker.tell(req, null);
+ switch (executeType) {
+ case STANDALONE:
+ req.setExecuteType(ExecuteType.STANDALONE.name());
+ req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
+ break;
+ case MAP_REDUCE:
+ req.setExecuteType(ExecuteType.MAP_REDUCE.name());
+ req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
+ break;
+ case BROADCAST:
+ req.setExecuteType(ExecuteType.BROADCAST.name());
+ req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
+ break;
+ }
- Thread.sleep(500000);
+ return req;
}
diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java
new file mode 100644
index 00000000..8460cfb6
--- /dev/null
+++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java
@@ -0,0 +1,37 @@
+package com.github.kfcfans.oms.processors;
+
+import com.github.kfcfans.oms.worker.sdk.ProcessResult;
+import com.github.kfcfans.oms.worker.sdk.TaskContext;
+import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
+
+import java.util.Map;
+
+/**
+ * 测试用的广播执行处理器
+ *
+ * @author tjq
+ * @since 2020/3/25
+ */
+public class TestBroadcastProcessor implements BroadcastProcessor {
+ @Override
+ public ProcessResult preProcess(TaskContext taskContext) throws Exception {
+ System.out.println("=============== TestBroadcastProcessor#preProcess ===============");
+ System.out.println("taskContext:" + taskContext);
+ return new ProcessResult(true, "preProcess success");
+ }
+
+ @Override
+ public ProcessResult postProcess(TaskContext taskContext, Map taskId2Result) throws Exception {
+ System.out.println("=============== TestBroadcastProcessor#postProcess ===============");
+ System.out.println("taskContext:" + taskContext);
+ System.out.println("taskId2Result:" + taskId2Result);
+ return new ProcessResult(true, "postProcess success");
+ }
+
+ @Override
+ public ProcessResult process(TaskContext context) throws Exception {
+ System.out.println("=============== TestBroadcastProcessor#process ===============");
+ System.out.println("taskContext:" + context);
+ return new ProcessResult(true, "processSuccess");
+ }
+}
diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java
index 1905c1a5..1c88ea47 100644
--- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java
+++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java
@@ -8,6 +8,7 @@ import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -22,7 +23,9 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
@Override
public ProcessResult reduce(TaskContext taskContext, Map taskId2Result) {
System.out.println("============== TestMapReduceProcessor#reduce ==============");
- return new ProcessResult(true, "reduce success");
+ System.out.println("taskContext:" + taskContext);
+ System.out.println("taskId2Result:" + taskId2Result);
+ return new ProcessResult(true, "REDUCE_SUCCESS");
}
@Override
@@ -39,19 +42,25 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
}
ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK");
System.out.println("map result = " + mapResult);
+ return new ProcessResult(true, "MAP_SUCCESS");
}else {
System.out.println("start to process");
System.out.println(context.getSubTask());
+ return new ProcessResult(true, "PROCESS_SUCCESS");
}
- return new ProcessResult(true, "PROCESS_SUCCESS");
}
@ToString
@NoArgsConstructor
@AllArgsConstructor
- private static class TestSubTask {
+ private static class TestSubTask implements Serializable {
private String name;
private int age;
}
+
+ @Override
+ public void init() throws Exception {
+ System.out.println("============== TestMapReduceProcessor#init ==============");
+ }
}