support script processor

This commit is contained in:
tjq 2020-04-16 15:06:36 +08:00
parent 494c1aa18f
commit a8354f4cf4
22 changed files with 170 additions and 99 deletions

View File

@ -5,7 +5,7 @@ OhMyScheduler是一个分布式调度平台和分布式计算框架
* 支持单机、广播、**MapReduce**三种执行模式
* 支持任意的水平扩展,性能强劲无上限
* 仅依赖数据库,部署简单,上手容易,开发高效,仅需几行代码即可获得整个集群的分布式计算能力。
* 支持SpringBean、普通Java类内置/外置、Shell、Python等处理器(开发中...马上实现)
* 支持SpringBean、普通Java类内置/外置、Shell、Python等处理器
# 部署
### 环境要求

View File

@ -15,7 +15,7 @@ public enum ProcessorType {
EMBEDDED_JAVA(1, "内置JAVA处理器"),
SHELL(2, "SHELL脚本"),
PYTHON2(3, "Python2脚本");
PYTHON(3, "Python脚本");
private int v;
private String des;

View File

@ -23,6 +23,7 @@
<kryo.version>5.0.0-RC5</kryo.version>
<fastjson.version>1.2.68</fastjson.version>
<okhttp.version>4.4.1</okhttp.version>
<commons.io.version>2.6</commons.io.version>
</properties>
<dependencies>
@ -91,6 +92,14 @@
<version>${fastjson.version}</version>
</dependency>
<!-- commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
<!-- 开发阶段输出日志 -->
<dependency>

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.worker.core.classloader;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;

View File

@ -11,11 +11,11 @@ import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor;
import com.google.common.base.Stopwatch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.worker.sdk;
package com.github.kfcfans.oms.worker.core.processor;
import lombok.*;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.worker.sdk;
package com.github.kfcfans.oms.worker.core.processor;
import lombok.Getter;
import lombok.Setter;

View File

@ -0,0 +1,24 @@
package com.github.kfcfans.oms.worker.core.processor.built;
/**
* Python 处理器
*
* @author tjq
* @since 2020/4/16
*/
public class PythonProcessor extends ScriptProcessor {
public PythonProcessor(Long instanceId, String processorInfo) throws Exception {
super(instanceId, processorInfo);
}
@Override
protected String genScriptPath(Long instanceId) {
return String.format("~/oms/script/python/%d.py", instanceId);
}
@Override
protected String fetchRunCommand() {
return "python";
}
}

View File

@ -1,64 +1,58 @@
package com.github.kfcfans.oms.worker.core.executor;
package com.github.kfcfans.oms.worker.core.processor.built;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import java.io.*;
import java.net.URL;
import java.util.Set;
/**
* Shell 处理器
* ProcessorTracker 创建
* 脚本处理器
*
* @author tjq
* @since 2020/4/15
* @since 2020/4/16
*/
@Slf4j
public class ShellProcessor implements BasicProcessor {
public abstract class ScriptProcessor implements BasicProcessor {
private Long instanceId;
// shell 脚本绝对路径
// 脚本绝对路径
private String scriptPath;
private static final String SHELL_PREFIX = "#!/bin/";
private static final String DEFAULT_ACTUATOR = "sh";
private static final String FILE_PATH_PATTERN = "~/.oms/script/shell/%d.sh";
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
public ShellProcessor(Long instanceId, String processorInfo) throws Exception {
public ScriptProcessor(Long instanceId, String processorInfo) throws Exception {
this.instanceId = instanceId;
this.scriptPath = String.format(FILE_PATH_PATTERN, instanceId);
this.scriptPath = genScriptPath(instanceId);
// 如果是下载连接则从网络获取
for (String protocol : DOWNLOAD_PROTOCOL) {
if (processorInfo.startsWith(protocol)) {
downloadShellScript(processorInfo);
File script = new File(scriptPath);
if (script.exists()) {
return;
}
}
// 如是只是单纯的 shell 命令则将其补充为 shell 脚本
if (!processorInfo.startsWith(SHELL_PREFIX)) {
processorInfo = SHELL_PREFIX + DEFAULT_ACTUATOR + System.lineSeparator() + processorInfo;
}
// 写入本地文件
File script = new File(scriptPath);
if (!script.exists()) {
File dir = new File(script.getParent());
boolean success = dir.mkdirs();
success = script.createNewFile();
if (!success) {
throw new RuntimeException("create script file failed");
}
// 如果是下载连接则从网络获取
for (String protocol : DOWNLOAD_PROTOCOL) {
if (processorInfo.startsWith(protocol)) {
FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
return;
}
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw);) {
}
// 持久化到本地
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
bw.write(processorInfo);
bw.flush();
}
@ -69,15 +63,11 @@ public class ShellProcessor implements BasicProcessor {
// 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodPb.start().waitFor();
String chmod = "chmod +x " + scriptPath;
Process chmodProcess = Runtime.getRuntime().exec(chmod);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodProcess.waitFor();
// 2. 执行目标脚本
ProcessBuilder pb = new ProcessBuilder("/bin/sh", scriptPath);
ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath);
Process process = pb.start();
String s;
StringBuilder inputSB = new StringBuilder();
@ -108,8 +98,16 @@ public class ShellProcessor implements BasicProcessor {
return new ProcessResult(true, result);
}
private void downloadShellScript(String url) {
// 1. 下载
// 2. 读取前两位获取解释器
}
/**
* 生成绝对脚本路径
* @param instanceId 任务实例ID作为文件名称使用JobId会有更改不生效的问题
* @return 文件名称
*/
protected abstract String genScriptPath(Long instanceId);
/**
* 获取运行命令egshell返回 /bin/sh
* @return 执行脚本的命令
*/
protected abstract String fetchRunCommand();
}

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.oms.worker.core.processor.built;
import lombok.extern.slf4j.Slf4j;
/**
* Shell 处理器
* ProcessorTracker 创建
*
* @author tjq
* @since 2020/4/15
*/
@Slf4j
public class ShellProcessor extends ScriptProcessor {
public ShellProcessor(Long instanceId, String processorInfo) throws Exception {
super(instanceId, processorInfo);
}
@Override
protected String genScriptPath(Long instanceId) {
return String.format("~/oms/script/shell/%d.sh", instanceId);
}
@Override
protected String fetchRunCommand() {
return "/bin/sh";
}
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.worker.sdk.api;
package com.github.kfcfans.oms.worker.core.processor.sdk;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
/**
* 基础的处理器适用于单机执行

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.worker.sdk.api;
package com.github.kfcfans.oms.worker.core.processor.sdk;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.worker.sdk.api;
package com.github.kfcfans.oms.worker.core.processor.sdk;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
@ -10,8 +10,8 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

View File

@ -10,13 +10,14 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.oms.worker.core.executor.ShellProcessor;
import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
@ -213,11 +214,14 @@ public class ProcessorTracker {
}
break;
case SHELL:
processor = new ShellProcessor(instanceId, instanceInfo.getProcessorInfo());
processor = new ShellProcessor(instanceId, processorInfo);
break;
case PYTHON2:
case PYTHON:
processor = new PythonProcessor(instanceId, processorInfo);
break;
default:
log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType);
throw new IllegalArgumentException("unknown processor type of " + processorType);
}
if (processor == null) {

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.oms.worker.core.executor.ShellProcessor;
import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor;
import org.junit.jupiter.api.Test;
/**
@ -12,12 +13,24 @@ import org.junit.jupiter.api.Test;
public class ScriptProcessorTest {
@Test
public void testShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(277L, "ls -a");
public void testLocalShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(1L, "ls -a");
sp.process(null);
ShellProcessor sp2 = new ShellProcessor(277L, "pwd");
ShellProcessor sp2 = new ShellProcessor(2777L, "pwd");
sp2.process(null);
}
@Test
public void testLocalPythonProcessor() throws Exception {
PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'");
pp.process(null);
}
@Test
public void testNetShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh");
sp.process(null);
}
}

View File

@ -1,16 +1,9 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.google.common.collect.Lists;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
/**
* 测试用的基础处理器

View File

@ -1,9 +1,9 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor;
import java.util.Map;

View File

@ -1,16 +1,15 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.processors.demo;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
/**
* 示例-单机任务处理器

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.processors.demo;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor;
import java.util.Map;

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.processors.demo;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;

View File

@ -22,6 +22,8 @@
* 固定延迟 -> 填写整数,单位毫秒
* 执行配置由执行类型单机、广播和MapReduce、处理器类型和处理器参数组成后两项相互关联。
* 内置Java处理器 -> 填写该处理器的全限定类名eg, `com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo`
* SHELL -> 填写需要处理的脚本直接复制文件内容或脚本下载连接http://xxx
* PYTHON -> 填写完整的python脚本或下载连接http://xxx
* 运行配置
* 最大实例数:该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例)
@ -54,7 +56,7 @@
>搭载处理器的宿主应用需要添加`oh-my-scheduler-worker`依赖。
### 单机处理器
>单机执行的策略下server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口`com.github.kfcfans.oms.worker.sdk.api.BasicProcessor`,代码示例如下:
>单机执行的策略下server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口`com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor`,代码示例如下:
```java
public class BasicProcessorDemo implements BasicProcessor {