From d44f818b81d26a2b4a05db34d5ce5d598f19093d Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 25 Mar 2020 21:24:47 +0800 Subject: [PATCH] finished simple test for TaskTracker(MapReduce Mode) and change serialize framwork to fst because of fastjson's indian --- oh-my-scheduler-worker/pom.xml | 10 ++--- .../oms/worker/actors/TaskTrackerActor.java | 2 + .../worker/common/utils/SerializerUtils.java | 35 ++++++++++++++++++ .../core/executor/ProcessorRunnable.java | 6 ++- .../worker/persistence/SimpleTaskQuery.java | 5 +++ .../pojo/request/ProcessorMapTaskRequest.java | 5 +-- .../pojo/request/TaskTrackerStartTaskReq.java | 2 + .../github/kfcfans/oms/TaskTrackerTest.java | 31 ++++++++++++++-- .../processors/TestBroadcastProcessor.java | 37 +++++++++++++++++++ .../processors/TestMapReduceProcessor.java | 15 ++++++-- 10 files changed, 131 insertions(+), 17 deletions(-) create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 4345c17a..eea81ec8 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -20,8 +20,8 @@ 1.4.200 3.4.2 28.2-jre - 1.2.58 5.6.1 + 2.56 @@ -68,11 +68,11 @@ ${guava.version} - + - com.alibaba - fastjson - ${fastjson.version} + de.ruedigermoeller + fst + ${fst.version} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 76490786..8ab946af 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -76,6 +76,8 @@ public class TaskTrackerActor extends AbstractActor { subTask.setTaskName(req.getTaskName()); subTask.setTaskId(originSubTask.getTaskId()); subTask.setTaskContent(originSubTask.getTaskContent()); + + subTaskList.add(subTask); }); success = taskTracker.addTask(subTaskList); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java new file mode 100644 index 00000000..f324bd01 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java @@ -0,0 +1,35 @@ +package com.github.kfcfans.oms.worker.common.utils; + +import org.nustaq.serialization.FSTConfiguration; + +import java.nio.charset.StandardCharsets; + +/** + * 序列化框架 + * + * @author tjq + * @since 2020/3/25 + */ +public class SerializerUtils { + + private static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration(); + + public static byte[] serialize(Object obj) { + return conf.asByteArray(obj); + } + + public static Object deSerialized(byte[] bytes) { + return conf.asObject(bytes); + } + + public static String toJSON(Object object) { + if (object == null) { + return null; + } + try { + return new String(serialize(object), StandardCharsets.UTF_8); + }catch (Exception ignore) { + } + return null; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 93ffa47c..9ca891db 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -1,12 +1,12 @@ package com.github.kfcfans.oms.worker.core.executor; import akka.actor.ActorSelection; -import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.oms.worker.common.ThreadLocalStore; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.github.kfcfans.oms.worker.common.utils.SerializerUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; @@ -54,7 +54,7 @@ public class ProcessorRunnable implements Runnable { TaskContext taskContext = new TaskContext(); BeanUtils.copyProperties(request, taskContext); if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) { - taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent())); + taskContext.setSubTask(SerializerUtils.deSerialized(request.getSubTaskContent())); } ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); @@ -104,6 +104,8 @@ public class ProcessorRunnable implements Runnable { ProcessResult lastResult; Map taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId); + // 去除本任务 + taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID); try { switch (executeType) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java index 351a88f0..89afb841 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java @@ -57,6 +57,11 @@ public class SimpleTaskQuery { } String substring = sb.substring(0, sb.length() - LINK.length()); + + if (!StringUtils.isEmpty(otherCondition)) { + substring += otherCondition; + } + if (limit != null) { substring = substring + " limit " + limit; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java index d3c684bb..0bba1e5d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java @@ -1,7 +1,6 @@ package com.github.kfcfans.oms.worker.pojo.request; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; +import com.github.kfcfans.oms.worker.common.utils.SerializerUtils; import com.github.kfcfans.oms.worker.sdk.TaskContext; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; @@ -44,7 +43,7 @@ public class ProcessorMapTaskRequest implements Serializable { // 不同执行线程之间,前缀(taskId)不同,该ID可以保证分布式唯一 String subTaskId = taskContext.getTaskId() + "." + i; // 写入类名,方便反序列化 - byte[] content = JSON.toJSONBytes(subTaskList.get(i), SerializerFeature.WriteClassName); + byte[] content = SerializerUtils.serialize(subTaskList.get(i)); subTasks.add(new SubTask(subTaskId, content)); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java index 5377494a..99711eb4 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -48,6 +48,7 @@ public class TaskTrackerStartTaskReq implements Serializable { public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) { + jobId = instanceInfo.getJobId(); instanceId = instanceInfo.getInstanceId(); processorType = instanceInfo.getProcessorType(); @@ -60,6 +61,7 @@ public class TaskTrackerStartTaskReq implements Serializable { jobParams = instanceInfo.getJobParams(); instanceParams = instanceInfo.getInstanceParams(); + taskId = task.getTaskId(); taskName = task.getTaskName(); subTaskContent = task.getTaskContent(); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java index c8f71242..2faf433c 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java @@ -41,23 +41,46 @@ public class TaskTrackerTest { @Test public void testStandaloneJob() throws Exception { + remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null); + Thread.sleep(500000); + } + + @Test + public void testMapReduceJob() throws Exception { + remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null); + Thread.sleep(500000); + } + + private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) { ServerScheduleJobReq req = new ServerScheduleJobReq(); req.setJobId("1"); req.setInstanceId("10086"); req.setAllWorkerAddress(NetUtils.getLocalHost()); - req.setExecuteType(ExecuteType.STANDALONE.name()); + req.setJobParams("this is job Params"); req.setInstanceParams("this is instance Params"); req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); - req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor"); req.setTaskRetryNum(3); req.setThreadConcurrency(5); req.setTimeLimit(500000); - remoteTaskTracker.tell(req, null); + switch (executeType) { + case STANDALONE: + req.setExecuteType(ExecuteType.STANDALONE.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor"); + break; + case MAP_REDUCE: + req.setExecuteType(ExecuteType.MAP_REDUCE.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor"); + break; + case BROADCAST: + req.setExecuteType(ExecuteType.BROADCAST.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor"); + break; + } - Thread.sleep(500000); + return req; } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java new file mode 100644 index 00000000..8460cfb6 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java @@ -0,0 +1,37 @@ +package com.github.kfcfans.oms.processors; + +import com.github.kfcfans.oms.worker.sdk.ProcessResult; +import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; + +import java.util.Map; + +/** + * 测试用的广播执行处理器 + * + * @author tjq + * @since 2020/3/25 + */ +public class TestBroadcastProcessor implements BroadcastProcessor { + @Override + public ProcessResult preProcess(TaskContext taskContext) throws Exception { + System.out.println("=============== TestBroadcastProcessor#preProcess ==============="); + System.out.println("taskContext:" + taskContext); + return new ProcessResult(true, "preProcess success"); + } + + @Override + public ProcessResult postProcess(TaskContext taskContext, Map taskId2Result) throws Exception { + System.out.println("=============== TestBroadcastProcessor#postProcess ==============="); + System.out.println("taskContext:" + taskContext); + System.out.println("taskId2Result:" + taskId2Result); + return new ProcessResult(true, "postProcess success"); + } + + @Override + public ProcessResult process(TaskContext context) throws Exception { + System.out.println("=============== TestBroadcastProcessor#process ==============="); + System.out.println("taskContext:" + context); + return new ProcessResult(true, "processSuccess"); + } +} diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java index 1905c1a5..1c88ea47 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java @@ -8,6 +8,7 @@ import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.ToString; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -22,7 +23,9 @@ public class TestMapReduceProcessor extends MapReduceProcessor { @Override public ProcessResult reduce(TaskContext taskContext, Map taskId2Result) { System.out.println("============== TestMapReduceProcessor#reduce =============="); - return new ProcessResult(true, "reduce success"); + System.out.println("taskContext:" + taskContext); + System.out.println("taskId2Result:" + taskId2Result); + return new ProcessResult(true, "REDUCE_SUCCESS"); } @Override @@ -39,19 +42,25 @@ public class TestMapReduceProcessor extends MapReduceProcessor { } ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK"); System.out.println("map result = " + mapResult); + return new ProcessResult(true, "MAP_SUCCESS"); }else { System.out.println("start to process"); System.out.println(context.getSubTask()); + return new ProcessResult(true, "PROCESS_SUCCESS"); } - return new ProcessResult(true, "PROCESS_SUCCESS"); } @ToString @NoArgsConstructor @AllArgsConstructor - private static class TestSubTask { + private static class TestSubTask implements Serializable { private String name; private int age; } + + @Override + public void init() throws Exception { + System.out.println("============== TestMapReduceProcessor#init =============="); + } }