diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java index 5a40982c..c465b168 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java @@ -13,7 +13,9 @@ import lombok.Getter; @AllArgsConstructor public enum ProcessorType { - EMBEDDED_JAVA(1, "内置JAVA处理器"); + EMBEDDED_JAVA(1, "内置JAVA处理器"), + SHELL(2, "SHELL脚本"), + PYTHON2(3, "Python2脚本"); private int v; private String des; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index f32d36d4..955fe510 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -65,7 +65,7 @@ public class ProcessorRunnable implements Runnable { if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) { // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task - if (executeType == ExecuteType.BROADCAST) { + if (executeType == ExecuteType.BROADCAST && processor instanceof BroadcastProcessor) { BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ShellProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ShellProcessor.java new file mode 100644 index 00000000..bfc0a5af --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ShellProcessor.java @@ -0,0 +1,115 @@ +package com.github.kfcfans.oms.worker.core.executor; + +import com.github.kfcfans.oms.worker.sdk.ProcessResult; +import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; + +import java.io.*; +import java.util.Set; + +/** + * Shell 处理器 + * 由 ProcessorTracker 创建 + * + * @author tjq + * @since 2020/4/15 + */ +@Slf4j +public class ShellProcessor implements BasicProcessor { + + private Long instanceId; + // shell 脚本绝对路径 + private String scriptPath; + + private static final String SHELL_PREFIX = "#!/bin/"; + private static final String DEFAULT_ACTUATOR = "sh"; + + private static final String FILE_PATH_PATTERN = "~/.oms/script/shell/%d.sh"; + + private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); + + public ShellProcessor(Long instanceId, String processorInfo) throws Exception { + + this.instanceId = instanceId; + this.scriptPath = String.format(FILE_PATH_PATTERN, instanceId); + + // 如果是下载连接,则从网络获取 + for (String protocol : DOWNLOAD_PROTOCOL) { + if (processorInfo.startsWith(protocol)) { + downloadShellScript(processorInfo); + return; + } + } + + // 如是只是单纯的 shell 命令,则将其补充为 shell 脚本 + if (!processorInfo.startsWith(SHELL_PREFIX)) { + processorInfo = SHELL_PREFIX + DEFAULT_ACTUATOR + System.lineSeparator() + processorInfo; + } + + // 写入本地文件 + File script = new File(scriptPath); + if (!script.exists()) { + File dir = new File(script.getParent()); + boolean success = dir.mkdirs(); + success = script.createNewFile(); + if (!success) { + throw new RuntimeException("create script file failed"); + } + } + try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw);) { + bw.write(processorInfo); + bw.flush(); + } + } + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + // 1. 授权 + ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); + chmodPb.start().waitFor(); + + String chmod = "chmod +x " + scriptPath; + Process chmodProcess = Runtime.getRuntime().exec(chmod); + // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) + chmodProcess.waitFor(); + + // 2. 执行目标脚本 + ProcessBuilder pb = new ProcessBuilder("/bin/sh", scriptPath); + Process process = pb.start(); + String s; + StringBuilder inputSB = new StringBuilder(); + StringBuilder errorSB = new StringBuilder(); + try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream())); + BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { + while ((s = stdInput.readLine()) != null) { + inputSB.append(s); + } + while ((s = stdError.readLine()) != null) { + errorSB.append(s); + } + } + process.waitFor(); + + String result = null; + if (inputSB.length() > 0) { + result = "input:" + inputSB.toString() + " ; "; + } + if (errorSB.length() > 0) { + result = "error: " + errorSB.toString() + " ; "; + } + if (result == null) { + result = "PROCESS_SUCCESS"; + } + + log.debug("[ShellProcessor] process result for instance(instanceId={}) is {}.", instanceId, result); + return new ProcessResult(true, result); + } + + private void downloadShellScript(String url) { + // 1. 下载 + // 2. 读取前两位,获取解释器 + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 4fcd59f6..19d90c30 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -10,6 +10,7 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; +import com.github.kfcfans.oms.worker.core.executor.ShellProcessor; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; @@ -53,7 +54,7 @@ public class ProcessorTracker { /** * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) */ - public ProcessorTracker(TaskTrackerStartTaskReq request) { + public ProcessorTracker(TaskTrackerStartTaskReq request) throws Exception { // 赋值 this.startTime = System.currentTimeMillis(); @@ -191,7 +192,7 @@ public class ProcessorTracker { } - private void initProcessor() { + private void initProcessor() throws Exception { ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); String processorInfo = instanceInfo.getProcessorInfo(); @@ -210,6 +211,13 @@ public class ProcessorTracker { if (processor == null) { processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo); } + break; + case SHELL: + processor = new ShellProcessor(instanceId, instanceInfo.getProcessorInfo()); + break; + case PYTHON2: + + } if (processor == null) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java index c608caee..7f50d4ec 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java @@ -21,8 +21,8 @@ public class ConnectionFactory { private static volatile DataSource dataSource; - private static final String DISK_JDBC_URL = "jdbc:h2:file:~/.h2/oms/oms_worker_db"; - private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/.h2/oms/oms_worker_db"; + private static final String DISK_JDBC_URL = "jdbc:h2:file:~/.oms/h2/oms_worker_db"; + private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/.oms/h2/oms_worker_db"; public static Connection getConnection() throws SQLException { return getDataSource().getConnection(); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java new file mode 100644 index 00000000..478104f0 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.oms; + +import com.github.kfcfans.oms.worker.core.executor.ShellProcessor; +import org.junit.jupiter.api.Test; + +/** + * 测试脚本处理器 + * + * @author tjq + * @since 2020/4/15 + */ +public class ScriptProcessorTest { + + @Test + public void testShellProcessor() throws Exception { + ShellProcessor sp = new ShellProcessor(277L, "ls -a"); + sp.process(null); + + ShellProcessor sp2 = new ShellProcessor(277L, "pwd"); + sp2.process(null); + } + +}