supprt shell processor

This commit is contained in:
tjq 2020-04-15 22:41:40 +08:00
parent e1147c2ca1
commit 494c1aa18f
6 changed files with 154 additions and 6 deletions

View File

@ -13,7 +13,9 @@ import lombok.Getter;
@AllArgsConstructor
public enum ProcessorType {
EMBEDDED_JAVA(1, "内置JAVA处理器");
EMBEDDED_JAVA(1, "内置JAVA处理器"),
SHELL(2, "SHELL脚本"),
PYTHON2(3, "Python2脚本");
private int v;
private String des;

View File

@ -65,7 +65,7 @@ public class ProcessorRunnable implements Runnable {
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
if (executeType == ExecuteType.BROADCAST && processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();

View File

@ -0,0 +1,115 @@
package com.github.kfcfans.oms.worker.core.executor;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.Set;
/**
* Shell 处理器
* ProcessorTracker 创建
*
* @author tjq
* @since 2020/4/15
*/
@Slf4j
public class ShellProcessor implements BasicProcessor {
private Long instanceId;
// shell 脚本绝对路径
private String scriptPath;
private static final String SHELL_PREFIX = "#!/bin/";
private static final String DEFAULT_ACTUATOR = "sh";
private static final String FILE_PATH_PATTERN = "~/.oms/script/shell/%d.sh";
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
public ShellProcessor(Long instanceId, String processorInfo) throws Exception {
this.instanceId = instanceId;
this.scriptPath = String.format(FILE_PATH_PATTERN, instanceId);
// 如果是下载连接则从网络获取
for (String protocol : DOWNLOAD_PROTOCOL) {
if (processorInfo.startsWith(protocol)) {
downloadShellScript(processorInfo);
return;
}
}
// 如是只是单纯的 shell 命令则将其补充为 shell 脚本
if (!processorInfo.startsWith(SHELL_PREFIX)) {
processorInfo = SHELL_PREFIX + DEFAULT_ACTUATOR + System.lineSeparator() + processorInfo;
}
// 写入本地文件
File script = new File(scriptPath);
if (!script.exists()) {
File dir = new File(script.getParent());
boolean success = dir.mkdirs();
success = script.createNewFile();
if (!success) {
throw new RuntimeException("create script file failed");
}
}
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw);) {
bw.write(processorInfo);
bw.flush();
}
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
// 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
chmodPb.start().waitFor();
String chmod = "chmod +x " + scriptPath;
Process chmodProcess = Runtime.getRuntime().exec(chmod);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodProcess.waitFor();
// 2. 执行目标脚本
ProcessBuilder pb = new ProcessBuilder("/bin/sh", scriptPath);
Process process = pb.start();
String s;
StringBuilder inputSB = new StringBuilder();
StringBuilder errorSB = new StringBuilder();
try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
while ((s = stdInput.readLine()) != null) {
inputSB.append(s);
}
while ((s = stdError.readLine()) != null) {
errorSB.append(s);
}
}
process.waitFor();
String result = null;
if (inputSB.length() > 0) {
result = "input:" + inputSB.toString() + " ; ";
}
if (errorSB.length() > 0) {
result = "error: " + errorSB.toString() + " ; ";
}
if (result == null) {
result = "PROCESS_SUCCESS";
}
log.debug("[ShellProcessor] process result for instance(instanceId={}) is {}.", instanceId, result);
return new ProcessResult(true, result);
}
private void downloadShellScript(String url) {
// 1. 下载
// 2. 读取前两位获取解释器
}
}

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.oms.worker.core.executor.ShellProcessor;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
@ -53,7 +54,7 @@ public class ProcessorTracker {
/**
* 创建 ProcessorTracker其实就是创建了个执行用的线程池 T_T
*/
public ProcessorTracker(TaskTrackerStartTaskReq request) {
public ProcessorTracker(TaskTrackerStartTaskReq request) throws Exception {
// 赋值
this.startTime = System.currentTimeMillis();
@ -191,7 +192,7 @@ public class ProcessorTracker {
}
private void initProcessor() {
private void initProcessor() throws Exception {
ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
String processorInfo = instanceInfo.getProcessorInfo();
@ -210,6 +211,13 @@ public class ProcessorTracker {
if (processor == null) {
processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo);
}
break;
case SHELL:
processor = new ShellProcessor(instanceId, instanceInfo.getProcessorInfo());
break;
case PYTHON2:
}
if (processor == null) {

View File

@ -21,8 +21,8 @@ public class ConnectionFactory {
private static volatile DataSource dataSource;
private static final String DISK_JDBC_URL = "jdbc:h2:file:~/.h2/oms/oms_worker_db";
private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/.h2/oms/oms_worker_db";
private static final String DISK_JDBC_URL = "jdbc:h2:file:~/.oms/h2/oms_worker_db";
private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/.oms/h2/oms_worker_db";
public static Connection getConnection() throws SQLException {
return getDataSource().getConnection();

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.oms.worker.core.executor.ShellProcessor;
import org.junit.jupiter.api.Test;
/**
* 测试脚本处理器
*
* @author tjq
* @since 2020/4/15
*/
public class ScriptProcessorTest {
@Test
public void testShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(277L, "ls -a");
sp.process(null);
ShellProcessor sp2 = new ShellProcessor(277L, "pwd");
sp2.process(null);
}
}