feat: remove build-in script processor

This commit is contained in:
tjq 2021-02-24 00:19:06 +08:00
parent 6dc20eee7d
commit a8e284ccbe
13 changed files with 110 additions and 275 deletions

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.common.constans;
package com.github.kfcfans.powerjob.common;
import lombok.AllArgsConstructor;
import lombok.Getter;

View File

@ -14,9 +14,12 @@ import lombok.Getter;
public enum ProcessorType {
EMBEDDED_JAVA(1, "内置JAVA处理器"),
JAVA_CONTAINER(4, "Java容器"),
@Deprecated
SHELL(2, "SHELL脚本"),
PYTHON(3, "Python脚本"),
JAVA_CONTAINER(4, "Java容器");
@Deprecated
PYTHON(3, "Python脚本");
private final int v;
private final String des;

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.DispatchStrategy;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
@ -127,7 +128,7 @@ public class SaveJobInfoRequest {
private String extra;
private Integer dispatchStrategy;
private DispatchStrategy dispatchStrategy;
private String lifecycle;
@ -143,4 +144,11 @@ public class SaveJobInfoRequest {
CommonUtils.requireNonNull(processorType, "processorType can't be empty");
CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
}
public DispatchStrategy getDispatchStrategy() {
if (dispatchStrategy == null) {
return DispatchStrategy.HEALTH_FIRST;
}
return dispatchStrategy;
}
}

View File

@ -66,10 +66,8 @@ public class JobInfoDO {
*/
private Integer processorType;
/**
* 执行器信息可能需要存储整个脚本文件
* 执行器信息
*/
@Lob
@Column
private String processorInfo;
/* ************************** 运行时配置 ************************** */

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.remote;
import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy;
import com.github.kfcfans.powerjob.common.DispatchStrategy;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;

View File

@ -7,16 +7,15 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j;
@ -79,7 +78,7 @@ public class JobService {
jobInfoDO.setProcessorType(request.getProcessorType().getV());
jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
jobInfoDO.setDispatchStrategy(DispatchStrategy.of(request.getDispatchStrategy()).getV());
jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV());
// 填充默认值非空保护防止 NPE
fillDefaultValue(jobInfoDO);

View File

@ -0,0 +1,89 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.persistence.criteria.Predicate;
import java.util.List;
import java.util.Set;
/**
* Help users upgrade from a low version of powerjob-server to a high version of powerjob-server
* v4 means that this interface was upgraded from version v3.x to v4.x, and so on
*
* @author tjq
* @since 2021/2/23
*/
@Slf4j
@RestController
@RequestMapping("/migrate")
public class MigrateController {
@Resource
private LockService lockService;
@Resource
private JobInfoRepository jobInfoRepository;
@GetMapping("/v4/script")
public ResultDTO<JSONObject> migrateScriptFromV3ToV4(Long appId) {
JSONObject resultLog = new JSONObject();
resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor");
resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'");
String lock = "migrateScriptFromV3ToV4-" + appId;
boolean getLock = lockService.tryLock(lock, 60000);
if (!getLock) {
return ResultDTO.failed("get lock failed, maybe other migrate job is running");
}
try {
Set<Long> convertedJobIds = Sets.newHashSet();
Specification<JobInfoDO> specification = (Specification<JobInfoDO>) (root, query, criteriaBuilder) -> {
List<Predicate> predicates = Lists.newLinkedList();
List<Integer> scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV());
predicates.add(criteriaBuilder.equal(root.get("appId"), appId));
predicates.add(root.get("processorType").in(scriptJobTypes));
return query.where(predicates.toArray(new Predicate[0])).getRestriction();
};
List<JobInfoDO> scriptJobs = jobInfoRepository.findAll(specification);
resultLog.put("scriptJobsNum", scriptJobs.size());
resultLog.put("convertedJobIds", convertedJobIds);
log.info("[MigrateScriptFromV3ToV4] script job num: {}", scriptJobs.size());
scriptJobs.forEach(job -> {
ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType());
job.setJobParams(job.getProcessorInfo());
job.setProcessorType(ProcessorType.EMBEDDED_JAVA.getV());
if (oldProcessorType == ProcessorType.PYTHON) {
job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor");
} else {
job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor");
}
jobInfoRepository.saveAndFlush(job);
convertedJobIds.add(job.getId());
});
return ResultDTO.success(resultLog);
} finally {
lockService.unlock(lock);
}
}
}

View File

@ -5,7 +5,7 @@ import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy;
import com.github.kfcfans.powerjob.common.DispatchStrategy;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.google.common.collect.Lists;

View File

@ -1,24 +0,0 @@
package com.github.kfcfans.powerjob.worker.core.processor.built;
/**
* Python 处理器
*
* @author tjq
* @since 2020/4/16
*/
public class PythonProcessor extends ScriptProcessor {
public PythonProcessor(Long instanceId, String processorInfo, long timeout) throws Exception {
super(instanceId, processorInfo, timeout);
}
@Override
protected String genScriptName() {
return String.format("python_%d.py", instanceId);
}
@Override
protected String fetchRunCommand() {
return "python";
}
}

View File

@ -1,147 +0,0 @@
package com.github.kfcfans.powerjob.worker.core.processor.built;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.*;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 脚本处理器
*
* @author tjq
* @since 2020/4/16
*/
@Slf4j
public abstract class ScriptProcessor implements BasicProcessor {
protected final Long instanceId;
// 脚本绝对路径
private final String scriptPath;
private final long timeout;
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception {
this.instanceId = instanceId;
this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName();
this.timeout = timeout;
File script = new File(scriptPath);
if (script.exists()) {
return;
}
File dir = new File(script.getParent());
boolean success = dir.mkdirs();
if (!success) {
throw new RuntimeException("create script folder failed.");
}
success = script.createNewFile();
if (!success) {
throw new RuntimeException("create script file failed");
}
// 如果是下载链接则从网络获取
for (String protocol : DOWNLOAD_PROTOCOL) {
if (processorInfo.startsWith(protocol)) {
FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
return;
}
}
// 非下载链接 processInfo 生成可执行文件
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
bw.write(processorInfo);
bw.flush();
}
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("SYSTEM===> ScriptProcessor start to process");
if (SystemUtils.IS_OS_WINDOWS) {
if (StringUtils.equals(fetchRunCommand(), "/bin/bash")) {
omsLogger.warn("Current OS is {} where shell scripts cannot run.", SystemUtils.OS_NAME);
return new ProcessResult(false, "Shell scripts cannot run on Windows");
}
} else {
// 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodPb.start().waitFor();
}
// 2. 执行目标脚本
ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath);
Process process = pb.start();
StringBuilder inputSB = new StringBuilder();
StringBuilder errorSB = new StringBuilder();
// 为了代码优雅而牺牲那么一点点点点点点点点性能
// 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题想来想去还是直接创建算了
new Thread(() -> copyStream(process.getInputStream(), inputSB, omsLogger)).start();
new Thread(() -> copyStream(process.getErrorStream(), errorSB, omsLogger)).start();
try {
boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS);
if (!s) {
omsLogger.info("SYSTEM===> process timeout");
return new ProcessResult(false, "TIMEOUT");
}
String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
// 0 代表正常退出
int exitValue = process.exitValue();
return new ProcessResult(exitValue == 0, result);
}catch (InterruptedException ie) {
omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted");
return new ProcessResult(false, "Interrupted");
}
}
private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) {
String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
while ((line = br.readLine()) != null) {
sb.append(line);
// 同步到在线日志
omsLogger.info(line);
}
} catch (Exception e) {
log.warn("[ScriptProcessor] copyStream failed.", e);
omsLogger.warn("[ScriptProcessor] copyStream failed.", e);
sb.append("Exception: ").append(e);
}
}
/**
* 生成脚本名称
* @return 文件名称
*/
protected abstract String genScriptName();
/**
* 获取运行命令egshell返回 /bin/sh
* @return 执行脚本的命令
*/
protected abstract String fetchRunCommand();
}

View File

@ -1,28 +0,0 @@
package com.github.kfcfans.powerjob.worker.core.processor.built;
import lombok.extern.slf4j.Slf4j;
/**
* Shell 处理器
* ProcessorTracker 创建
*
* @author tjq
* @since 2020/4/15
*/
@Slf4j
public class ShellProcessor extends ScriptProcessor {
public ShellProcessor(Long instanceId, String processorInfo, long timeout) throws Exception {
super(instanceId, processorInfo, timeout);
}
@Override
protected String genScriptName() {
return String.format("shell_%d.sh", instanceId);
}
@Override
protected String fetchRunCommand() {
return "/bin/sh";
}
}

View File

@ -11,8 +11,6 @@ import com.github.kfcfans.powerjob.worker.container.OmsContainer;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
import com.github.kfcfans.powerjob.worker.core.ProcessorBeanFactory;
import com.github.kfcfans.powerjob.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger;
@ -337,12 +335,6 @@ public class ProcessorTracker {
processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo);
}
break;
case SHELL:
processor = new ShellProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS());
break;
case PYTHON:
processor = new PythonProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS());
break;
case JAVA_CONTAINER:
String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);

View File

@ -1,55 +0,0 @@
package com.github.kfcfans.powerjob;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
/**
* 测试脚本处理器
*
* @author tjq
* @since 2020/4/15
*/
public class ScriptProcessorTest {
private static final long timeout = 10000;
private static final TaskContext context = new TaskContext();
@BeforeAll
public static void initContext() {
context.setOmsLogger(new OmsServerLogger(1L));
}
@Test
public void testLocalShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout);
System.out.println(sp.process(context));
ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout);
System.out.println(sp2.process(context));
}
@Test
public void testLocalPythonProcessor() throws Exception {
PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout);
System.out.println(pp.process(context));
}
@Test
public void testNetShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout);
System.out.println(sp.process(context));
}
@Test
public void testFailedScript() throws Exception {
ShellProcessor sp3 = new ShellProcessor(ThreadLocalRandom.current().nextLong(), "mvn tjq", timeout);
System.out.println(sp3.process(context));
}
}