diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/DispatchStrategy.java similarity index 90% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java rename to powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/DispatchStrategy.java index 3fb5b2f4..d28a983f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/DispatchStrategy.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.common.constans; +package com.github.kfcfans.powerjob.common; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProcessorType.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProcessorType.java index 63a0455a..66073904 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProcessorType.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProcessorType.java @@ -14,9 +14,12 @@ import lombok.Getter; public enum ProcessorType { EMBEDDED_JAVA(1, "内置JAVA处理器"), + JAVA_CONTAINER(4, "Java容器"), + + @Deprecated SHELL(2, "SHELL脚本"), - PYTHON(3, "Python脚本"), - JAVA_CONTAINER(4, "Java容器"); + @Deprecated + PYTHON(3, "Python脚本"); private final int v; private final String des; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java index 61199ae5..20101237 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.common.request.http; +import com.github.kfcfans.powerjob.common.DispatchStrategy; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; @@ -127,7 +128,7 @@ public class SaveJobInfoRequest { private String extra; - private Integer dispatchStrategy; + private DispatchStrategy dispatchStrategy; private String lifecycle; @@ -143,4 +144,11 @@ public class SaveJobInfoRequest { CommonUtils.requireNonNull(processorType, "processorType can't be empty"); CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty"); } + + public DispatchStrategy getDispatchStrategy() { + if (dispatchStrategy == null) { + return DispatchStrategy.HEALTH_FIRST; + } + return dispatchStrategy; + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index acb58b1c..96833408 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -66,10 +66,8 @@ public class JobInfoDO { */ private Integer processorType; /** - * 执行器信息(可能需要存储整个脚本文件) + * 执行器信息 */ - @Lob - @Column private String processorInfo; /* ************************** 运行时配置 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java index d24ea007..729ccfab 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.remote; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; -import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy; +import com.github.kfcfans.powerjob.common.DispatchStrategy; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 4fe4372c..a8629159 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -7,16 +7,15 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.server.common.SJ; -import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; -import com.github.kfcfans.powerjob.server.remote.DispatchService; -import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.github.kfcfans.powerjob.server.remote.DispatchService; +import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; import lombok.extern.slf4j.Slf4j; @@ -79,7 +78,7 @@ public class JobService { jobInfoDO.setProcessorType(request.getProcessorType().getV()); jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); - jobInfoDO.setDispatchStrategy(DispatchStrategy.of(request.getDispatchStrategy()).getV()); + jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV()); // 填充默认值,非空保护防止 NPE fillDefaultValue(jobInfoDO); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java new file mode 100644 index 00000000..8d0c40e8 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java @@ -0,0 +1,89 @@ +package com.github.kfcfans.powerjob.server.web.controller; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.common.ProcessorType; +import com.github.kfcfans.powerjob.common.response.ResultDTO; +import com.github.kfcfans.powerjob.server.extension.LockService; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import javax.persistence.criteria.Predicate; +import java.util.List; +import java.util.Set; + +/** + * Help users upgrade from a low version of powerjob-server to a high version of powerjob-server + * v4 means that this interface was upgraded from version v3.x to v4.x, and so on + * + * @author tjq + * @since 2021/2/23 + */ +@Slf4j +@RestController +@RequestMapping("/migrate") +public class MigrateController { + + @Resource + private LockService lockService; + @Resource + private JobInfoRepository jobInfoRepository; + + @GetMapping("/v4/script") + public ResultDTO migrateScriptFromV3ToV4(Long appId) { + + JSONObject resultLog = new JSONObject(); + resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor"); + resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'"); + + String lock = "migrateScriptFromV3ToV4-" + appId; + boolean getLock = lockService.tryLock(lock, 60000); + if (!getLock) { + return ResultDTO.failed("get lock failed, maybe other migrate job is running"); + } + try { + Set convertedJobIds = Sets.newHashSet(); + + Specification specification = (Specification) (root, query, criteriaBuilder) -> { + List predicates = Lists.newLinkedList(); + List scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV()); + predicates.add(criteriaBuilder.equal(root.get("appId"), appId)); + predicates.add(root.get("processorType").in(scriptJobTypes)); + return query.where(predicates.toArray(new Predicate[0])).getRestriction(); + }; + List scriptJobs = jobInfoRepository.findAll(specification); + + resultLog.put("scriptJobsNum", scriptJobs.size()); + resultLog.put("convertedJobIds", convertedJobIds); + + log.info("[MigrateScriptFromV3ToV4] script job num: {}", scriptJobs.size()); + scriptJobs.forEach(job -> { + + ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType()); + + job.setJobParams(job.getProcessorInfo()); + job.setProcessorType(ProcessorType.EMBEDDED_JAVA.getV()); + + if (oldProcessorType == ProcessorType.PYTHON) { + job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor"); + } else { + job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor"); + } + + jobInfoRepository.saveAndFlush(job); + convertedJobIds.add(job.getId()); + }); + return ResultDTO.success(resultLog); + } finally { + lockService.unlock(lock); + } + } + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java index ee0ed645..c6a222d5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java @@ -5,7 +5,7 @@ import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.server.common.SJ; -import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy; +import com.github.kfcfans.powerjob.common.DispatchStrategy; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.google.common.collect.Lists; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java deleted file mode 100644 index 8973efad..00000000 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.github.kfcfans.powerjob.worker.core.processor.built; - -/** - * Python 处理器 - * - * @author tjq - * @since 2020/4/16 - */ -public class PythonProcessor extends ScriptProcessor { - - public PythonProcessor(Long instanceId, String processorInfo, long timeout) throws Exception { - super(instanceId, processorInfo, timeout); - } - - @Override - protected String genScriptName() { - return String.format("python_%d.py", instanceId); - } - - @Override - protected String fetchRunCommand() { - return "python"; - } -} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java deleted file mode 100644 index 7d070a3e..00000000 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.github.kfcfans.powerjob.worker.core.processor.built; - -import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils; -import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; -import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; -import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; -import com.github.kfcfans.powerjob.worker.log.OmsLogger; -import com.google.common.collect.Sets; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; - -import java.io.*; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * 脚本处理器 - * - * @author tjq - * @since 2020/4/16 - */ -@Slf4j -public abstract class ScriptProcessor implements BasicProcessor { - - protected final Long instanceId; - // 脚本绝对路径 - private final String scriptPath; - private final long timeout; - - private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); - - public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception { - - this.instanceId = instanceId; - this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(); - this.timeout = timeout; - - File script = new File(scriptPath); - if (script.exists()) { - return; - } - - File dir = new File(script.getParent()); - boolean success = dir.mkdirs(); - if (!success) { - throw new RuntimeException("create script folder failed."); - } - success = script.createNewFile(); - if (!success) { - throw new RuntimeException("create script file failed"); - } - - // 如果是下载链接,则从网络获取 - for (String protocol : DOWNLOAD_PROTOCOL) { - if (processorInfo.startsWith(protocol)) { - FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); - return; - } - } - - // 非下载链接,为 processInfo 生成可执行文件 - try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { - bw.write(processorInfo); - bw.flush(); - } - } - - @Override - public ProcessResult process(TaskContext context) throws Exception { - - OmsLogger omsLogger = context.getOmsLogger(); - - omsLogger.info("SYSTEM===> ScriptProcessor start to process"); - - if (SystemUtils.IS_OS_WINDOWS) { - if (StringUtils.equals(fetchRunCommand(), "/bin/bash")) { - omsLogger.warn("Current OS is {} where shell scripts cannot run.", SystemUtils.OS_NAME); - return new ProcessResult(false, "Shell scripts cannot run on Windows"); - } - } else { - // 1. 授权 - ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); - // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) - chmodPb.start().waitFor(); - } - - // 2. 执行目标脚本 - ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath); - Process process = pb.start(); - - StringBuilder inputSB = new StringBuilder(); - StringBuilder errorSB = new StringBuilder(); - - // 为了代码优雅而牺牲那么一点点点点点点点点性能 - // 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题,想来想去还是直接创建算了。 - new Thread(() -> copyStream(process.getInputStream(), inputSB, omsLogger)).start(); - new Thread(() -> copyStream(process.getErrorStream(), errorSB, omsLogger)).start(); - - try { - boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS); - if (!s) { - omsLogger.info("SYSTEM===> process timeout"); - return new ProcessResult(false, "TIMEOUT"); - } - String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); - - // 0 代表正常退出 - int exitValue = process.exitValue(); - return new ProcessResult(exitValue == 0, result); - }catch (InterruptedException ie) { - omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted"); - return new ProcessResult(false, "Interrupted"); - } - } - - private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) { - String line; - try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { - while ((line = br.readLine()) != null) { - sb.append(line); - // 同步到在线日志 - omsLogger.info(line); - } - } catch (Exception e) { - log.warn("[ScriptProcessor] copyStream failed.", e); - omsLogger.warn("[ScriptProcessor] copyStream failed.", e); - - sb.append("Exception: ").append(e); - } - } - - /** - * 生成脚本名称 - * @return 文件名称 - */ - protected abstract String genScriptName(); - - /** - * 获取运行命令(eg,shell返回 /bin/sh) - * @return 执行脚本的命令 - */ - protected abstract String fetchRunCommand(); -} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java deleted file mode 100644 index d712bd5c..00000000 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.github.kfcfans.powerjob.worker.core.processor.built; - -import lombok.extern.slf4j.Slf4j; - -/** - * Shell 处理器 - * 由 ProcessorTracker 创建 - * - * @author tjq - * @since 2020/4/15 - */ -@Slf4j -public class ShellProcessor extends ScriptProcessor { - - public ShellProcessor(Long instanceId, String processorInfo, long timeout) throws Exception { - super(instanceId, processorInfo, timeout); - } - - @Override - protected String genScriptName() { - return String.format("shell_%d.sh", instanceId); - } - - @Override - protected String fetchRunCommand() { - return "/bin/sh"; - } -} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index fdef8c7b..95258ecc 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -11,8 +11,6 @@ import com.github.kfcfans.powerjob.worker.container.OmsContainer; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; import com.github.kfcfans.powerjob.worker.core.ProcessorBeanFactory; import com.github.kfcfans.powerjob.worker.core.executor.ProcessorRunnable; -import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor; -import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger; @@ -337,12 +335,6 @@ public class ProcessorTracker { processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo); } break; - case SHELL: - processor = new ShellProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS()); - break; - case PYTHON: - processor = new PythonProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS()); - break; case JAVA_CONTAINER: String[] split = processorInfo.split("#"); log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]); diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ScriptProcessorTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ScriptProcessorTest.java deleted file mode 100644 index e7217255..00000000 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ScriptProcessorTest.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.github.kfcfans.powerjob; - -import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; -import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor; -import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor; -import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.ThreadLocalRandom; - -/** - * 测试脚本处理器 - * - * @author tjq - * @since 2020/4/15 - */ -public class ScriptProcessorTest { - - private static final long timeout = 10000; - - private static final TaskContext context = new TaskContext(); - - @BeforeAll - public static void initContext() { - context.setOmsLogger(new OmsServerLogger(1L)); - } - - @Test - public void testLocalShellProcessor() throws Exception { - ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout); - System.out.println(sp.process(context)); - - ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout); - System.out.println(sp2.process(context)); - } - - @Test - public void testLocalPythonProcessor() throws Exception { - PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout); - System.out.println(pp.process(context)); - } - - @Test - public void testNetShellProcessor() throws Exception { - ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout); - System.out.println(sp.process(context)); - } - - @Test - public void testFailedScript() throws Exception { - ShellProcessor sp3 = new ShellProcessor(ThreadLocalRandom.current().nextLong(), "mvn tjq", timeout); - System.out.println(sp3.process(context)); - } -}