diff --git a/README.md b/README.md index eb98be05..ac3b9a52 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ OhMyScheduler是一个分布式调度平台和分布式计算框架 * 支持单机、广播、**MapReduce**三种执行模式 * 支持任意的水平扩展,性能强劲无上限 * 仅依赖数据库,部署简单,上手容易,开发高效,仅需几行代码即可获得整个集群的分布式计算能力。 -* 支持SpringBean、普通Java类(内置/外置)、Shell、Python等处理器(开发中...马上实现) +* 支持SpringBean、普通Java类(内置/外置)、Shell、Python等处理器 # 部署 ### 环境要求 diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java index c465b168..5dc18f38 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java @@ -15,7 +15,7 @@ public enum ProcessorType { EMBEDDED_JAVA(1, "内置JAVA处理器"), SHELL(2, "SHELL脚本"), - PYTHON2(3, "Python2脚本"); + PYTHON(3, "Python脚本"); private int v; private String des; diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index e8ed26aa..e5387236 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -23,6 +23,7 @@ 5.0.0-RC5 1.2.68 4.4.1 + 2.6 @@ -91,6 +92,14 @@ ${fastjson.version} + + + commons-io + commons-io + ${commons.io.version} + + + diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/classloader/ProcessorBeanFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/classloader/ProcessorBeanFactory.java index 436322c2..a591fdd3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/classloader/ProcessorBeanFactory.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/classloader/ProcessorBeanFactory.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.worker.core.classloader; -import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 955fe510..305d0f43 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -11,11 +11,11 @@ import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; -import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; -import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; +import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor; +import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor; import com.google.common.base.Stopwatch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/ProcessResult.java similarity index 82% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/ProcessResult.java index 7bc6f13a..065d4377 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/ProcessResult.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.worker.sdk; +package com.github.kfcfans.oms.worker.core.processor; import lombok.*; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java similarity index 95% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java index 5def3831..b55b6daf 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.worker.sdk; +package com.github.kfcfans.oms.worker.core.processor; import lombok.Getter; import lombok.Setter; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java new file mode 100644 index 00000000..a2a79eae --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java @@ -0,0 +1,24 @@ +package com.github.kfcfans.oms.worker.core.processor.built; + +/** + * Python 处理器 + * + * @author tjq + * @since 2020/4/16 + */ +public class PythonProcessor extends ScriptProcessor { + + public PythonProcessor(Long instanceId, String processorInfo) throws Exception { + super(instanceId, processorInfo); + } + + @Override + protected String genScriptPath(Long instanceId) { + return String.format("~/oms/script/python/%d.py", instanceId); + } + + @Override + protected String fetchRunCommand() { + return "python"; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ShellProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java similarity index 56% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ShellProcessor.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java index bfc0a5af..3a90292b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ShellProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java @@ -1,64 +1,58 @@ -package com.github.kfcfans.oms.worker.core.executor; +package com.github.kfcfans.oms.worker.core.processor.built; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import java.io.*; +import java.net.URL; import java.util.Set; /** - * Shell 处理器 - * 由 ProcessorTracker 创建 + * 脚本处理器 * * @author tjq - * @since 2020/4/15 + * @since 2020/4/16 */ @Slf4j -public class ShellProcessor implements BasicProcessor { +public abstract class ScriptProcessor implements BasicProcessor { private Long instanceId; - // shell 脚本绝对路径 + // 脚本绝对路径 private String scriptPath; - private static final String SHELL_PREFIX = "#!/bin/"; - private static final String DEFAULT_ACTUATOR = "sh"; - - private static final String FILE_PATH_PATTERN = "~/.oms/script/shell/%d.sh"; - private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); - public ShellProcessor(Long instanceId, String processorInfo) throws Exception { + public ScriptProcessor(Long instanceId, String processorInfo) throws Exception { this.instanceId = instanceId; - this.scriptPath = String.format(FILE_PATH_PATTERN, instanceId); + this.scriptPath = genScriptPath(instanceId); + + File script = new File(scriptPath); + if (script.exists()) { + return; + } + + File dir = new File(script.getParent()); + boolean success = dir.mkdirs(); + success = script.createNewFile(); + if (!success) { + throw new RuntimeException("create script file failed"); + } // 如果是下载连接,则从网络获取 for (String protocol : DOWNLOAD_PROTOCOL) { if (processorInfo.startsWith(protocol)) { - downloadShellScript(processorInfo); + FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); return; } } - // 如是只是单纯的 shell 命令,则将其补充为 shell 脚本 - if (!processorInfo.startsWith(SHELL_PREFIX)) { - processorInfo = SHELL_PREFIX + DEFAULT_ACTUATOR + System.lineSeparator() + processorInfo; - } - - // 写入本地文件 - File script = new File(scriptPath); - if (!script.exists()) { - File dir = new File(script.getParent()); - boolean success = dir.mkdirs(); - success = script.createNewFile(); - if (!success) { - throw new RuntimeException("create script file failed"); - } - } - try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw);) { + // 持久化到本地 + try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { bw.write(processorInfo); bw.flush(); } @@ -69,15 +63,11 @@ public class ShellProcessor implements BasicProcessor { // 1. 授权 ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); + // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) chmodPb.start().waitFor(); - String chmod = "chmod +x " + scriptPath; - Process chmodProcess = Runtime.getRuntime().exec(chmod); - // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) - chmodProcess.waitFor(); - // 2. 执行目标脚本 - ProcessBuilder pb = new ProcessBuilder("/bin/sh", scriptPath); + ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath); Process process = pb.start(); String s; StringBuilder inputSB = new StringBuilder(); @@ -108,8 +98,16 @@ public class ShellProcessor implements BasicProcessor { return new ProcessResult(true, result); } - private void downloadShellScript(String url) { - // 1. 下载 - // 2. 读取前两位,获取解释器 - } + /** + * 生成绝对脚本路径 + * @param instanceId 任务实例ID,作为文件名称(使用JobId会有更改不生效的问题) + * @return 文件名称 + */ + protected abstract String genScriptPath(Long instanceId); + + /** + * 获取运行命令(eg,shell返回 /bin/sh) + * @return 执行脚本的命令 + */ + protected abstract String fetchRunCommand(); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java new file mode 100644 index 00000000..f1b017fb --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java @@ -0,0 +1,29 @@ +package com.github.kfcfans.oms.worker.core.processor.built; + +import lombok.extern.slf4j.Slf4j; + +/** + * Shell 处理器 + * 由 ProcessorTracker 创建 + * + * @author tjq + * @since 2020/4/15 + */ +@Slf4j +public class ShellProcessor extends ScriptProcessor { + + + public ShellProcessor(Long instanceId, String processorInfo) throws Exception { + super(instanceId, processorInfo); + } + + @Override + protected String genScriptPath(Long instanceId) { + return String.format("~/oms/script/shell/%d.sh", instanceId); + } + + @Override + protected String fetchRunCommand() { + return "/bin/sh"; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java similarity index 71% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java index 0440cc72..141e602e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java @@ -1,7 +1,7 @@ -package com.github.kfcfans.oms.worker.sdk.api; +package com.github.kfcfans.oms.worker.core.processor.sdk; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; /** * 基础的处理器,适用于单机执行 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java similarity index 76% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java index 7c9d5b52..61351be4 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java @@ -1,7 +1,7 @@ -package com.github.kfcfans.oms.worker.sdk.api; +package com.github.kfcfans.oms.worker.core.processor.sdk; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; import java.util.Map; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java similarity index 94% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java index 985a6f2b..f0aa3baf 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.worker.sdk.api; +package com.github.kfcfans.oms.worker.core.processor.sdk; import akka.actor.ActorSelection; import akka.pattern.Patterns; @@ -10,8 +10,8 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.common.response.AskResponse; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 19d90c30..31fa0412 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -10,13 +10,14 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; -import com.github.kfcfans.oms.worker.core.executor.ShellProcessor; +import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor; +import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; -import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; @@ -213,11 +214,14 @@ public class ProcessorTracker { } break; case SHELL: - processor = new ShellProcessor(instanceId, instanceInfo.getProcessorInfo()); + processor = new ShellProcessor(instanceId, processorInfo); break; - case PYTHON2: - - + case PYTHON: + processor = new PythonProcessor(instanceId, processorInfo); + break; + default: + log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType); + throw new IllegalArgumentException("unknown processor type of " + processorType); } if (processor == null) { diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java index 478104f0..d61dc2af 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms; -import com.github.kfcfans.oms.worker.core.executor.ShellProcessor; +import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor; +import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor; import org.junit.jupiter.api.Test; /** @@ -12,12 +13,24 @@ import org.junit.jupiter.api.Test; public class ScriptProcessorTest { @Test - public void testShellProcessor() throws Exception { - ShellProcessor sp = new ShellProcessor(277L, "ls -a"); + public void testLocalShellProcessor() throws Exception { + ShellProcessor sp = new ShellProcessor(1L, "ls -a"); sp.process(null); - ShellProcessor sp2 = new ShellProcessor(277L, "pwd"); + ShellProcessor sp2 = new ShellProcessor(2777L, "pwd"); sp2.process(null); } + @Test + public void testLocalPythonProcessor() throws Exception { + PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'"); + pp.process(null); + } + + @Test + public void testNetShellProcessor() throws Exception { + ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh"); + sp.process(null); + } + } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java index 63b1442a..b1996a04 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java @@ -1,16 +1,9 @@ package com.github.kfcfans.oms.processors; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.common.ExecuteType; -import com.github.kfcfans.common.ProcessorType; -import com.github.kfcfans.common.RemoteConstant; -import com.github.kfcfans.common.TimeExpressionType; -import com.github.kfcfans.common.request.ServerScheduleJobReq; -import com.github.kfcfans.common.utils.NetUtils; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; -import com.google.common.collect.Lists; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; /** * 测试用的基础处理器 diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java index 59554343..35427c7e 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java @@ -1,9 +1,9 @@ package com.github.kfcfans.oms.processors; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor; import java.util.Map; diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java index 24b0b016..2649e130 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java @@ -1,16 +1,15 @@ package com.github.kfcfans.oms.processors; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; -import java.io.Serializable; import java.util.List; import java.util.Map; diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BasicProcessorDemo.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BasicProcessorDemo.java index 08c67fa3..e3c614a9 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BasicProcessorDemo.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BasicProcessorDemo.java @@ -1,8 +1,8 @@ package com.github.kfcfans.oms.processors.demo; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; /** * 示例-单机任务处理器 diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BroadcastProcessorDemo.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BroadcastProcessorDemo.java index 7067b710..139c685b 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BroadcastProcessorDemo.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/BroadcastProcessorDemo.java @@ -1,8 +1,8 @@ package com.github.kfcfans.oms.processors.demo; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor; import java.util.Map; diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/MapReduceProcessorDemo.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/MapReduceProcessorDemo.java index 2c1e1742..5104eeb6 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/MapReduceProcessorDemo.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/demo/MapReduceProcessorDemo.java @@ -1,8 +1,8 @@ package com.github.kfcfans.oms.processors.demo; -import com.github.kfcfans.oms.worker.sdk.ProcessResult; -import com.github.kfcfans.oms.worker.sdk.TaskContext; -import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; diff --git a/others/doc/DevelopmentGuide.md b/others/doc/DevelopmentGuide.md index 0f9f6235..28eb75f1 100644 --- a/others/doc/DevelopmentGuide.md +++ b/others/doc/DevelopmentGuide.md @@ -22,6 +22,8 @@ * 固定延迟 -> 填写整数,单位毫秒 * 执行配置:由执行类型(单机、广播和MapReduce)、处理器类型和处理器参数组成,后两项相互关联。 * 内置Java处理器 -> 填写该处理器的全限定类名(eg, `com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo`) + * SHELL -> 填写需要处理的脚本(直接复制文件内容)或脚本下载连接(http://xxx) + * PYTHON -> 填写完整的python脚本或下载连接(http://xxx) * 运行配置 * 最大实例数:该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例) @@ -54,7 +56,7 @@ >搭载处理器的宿主应用需要添加`oh-my-scheduler-worker`依赖。 ### 单机处理器 ->单机执行的策略下,server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口:`com.github.kfcfans.oms.worker.sdk.api.BasicProcessor`,代码示例如下: +>单机执行的策略下,server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口:`com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor`,代码示例如下: ```java public class BasicProcessorDemo implements BasicProcessor {