From 05a6016945ec8b9b942595a4a362a53bd77c151d Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 9 Aug 2020 18:13:20 +0800 Subject: [PATCH 01/17] [opt] %logger{50} -> %logger{20} --- powerjob-server/src/main/resources/logback-dev.xml | 2 +- powerjob-server/src/main/resources/logback-product.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/powerjob-server/src/main/resources/logback-dev.xml b/powerjob-server/src/main/resources/logback-dev.xml index 94e07a92..ec9b4011 100644 --- a/powerjob-server/src/main/resources/logback-dev.xml +++ b/powerjob-server/src/main/resources/logback-dev.xml @@ -10,7 +10,7 @@ converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/> + value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{20}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> diff --git a/powerjob-server/src/main/resources/logback-product.xml b/powerjob-server/src/main/resources/logback-product.xml index b14658f3..4fc5ea1e 100644 --- a/powerjob-server/src/main/resources/logback-product.xml +++ b/powerjob-server/src/main/resources/logback-product.xml @@ -16,7 +16,7 @@ 7 - %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n UTF-8 @@ -53,7 +53,7 @@ 7 - %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n UTF-8 true From 76ed28003fa85070689327fb5672d64af25f434c Mon Sep 17 00:00:00 2001 From: songyinyin Date: Sun, 9 Aug 2020 23:04:06 +0800 Subject: [PATCH 02/17] [dev] worker properties change from powerjob.xxx to powerjob.worker.xxx & If serverAddress is not exist, it will not start PowerJob worker --- .../src/main/resources/application.properties | 8 +- .../PowerJobAutoConfiguration.java | 36 ++++- .../autoconfigure/PowerJobProperties.java | 135 ++++++++++++++---- .../spring-configuration-metadata.json | 98 ++++++++++--- 4 files changed, 222 insertions(+), 55 deletions(-) diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties index ef35ebd2..7c3ff3fa 100644 --- a/powerjob-worker-samples/src/main/resources/application.properties +++ b/powerjob-worker-samples/src/main/resources/application.properties @@ -4,10 +4,10 @@ spring.jpa.open-in-view=false ########### powerjob-worker 配置 ########### # akka 工作端口,可选,默认 27777 -powerjob.akka-port=27777 +powerjob.worker.akka-port=27777 # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称 -powerjob.app-name=powerjob-agent-test +powerjob.worker.app-name=powerjob-agent-test # 调度服务器地址,IP:Port 或 域名,多值逗号分隔 -powerjob.server-address=127.0.0.1:7700,127.0.0.1:7701 +powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701 # 持久化方式,可选,默认 disk -powerjob.store-strategy=disk \ No newline at end of file +powerjob.worker.store-strategy=disk \ No newline at end of file diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java index 6f6000c4..31615b54 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java @@ -3,9 +3,12 @@ package com.github.kfcfans.powerjob.worker.autoconfigure; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.OhMyConfig; +import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import java.util.Arrays; @@ -19,32 +22,53 @@ import java.util.List; */ @Configuration @EnableConfigurationProperties(PowerJobProperties.class) +@Conditional(PowerJobAutoConfiguration.PowerJobWorkerCondition.class) public class PowerJobAutoConfiguration { @Bean @ConditionalOnMissingBean public OhMyWorker initPowerJob(PowerJobProperties properties) { + PowerJobProperties.Worker worker = properties.getWorker(); + // 服务器HTTP地址(端口号为 server.port,而不是 ActorSystem port),请勿添加任何前缀(http://) - CommonUtils.requireNonNull(properties.getServerAddress(), "serverAddress can't be empty!"); - List serverAddress = Arrays.asList(properties.getServerAddress().split(",")); + CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty!"); + List serverAddress = Arrays.asList(worker.getServerAddress().split(",")); // 1. 创建配置文件 OhMyConfig config = new OhMyConfig(); // 可以不显式设置,默认值 27777 - config.setPort(properties.getAkkaPort()); + config.setPort(worker.getAkkaPort()); // appName,需要提前在控制台注册,否则启动报错 - config.setAppName(properties.getAppName()); + config.setAppName(worker.getAppName()); config.setServerAddress(serverAddress); // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 // 有大型 Map/MapReduce 需求,可能产生大量子任务(Task)的场景,请使用 DISK,否则妥妥的 OutOfMemory - config.setStoreStrategy(properties.getStoreStrategy()); + config.setStoreStrategy(worker.getStoreStrategy()); // 启动测试模式,true情况下,不再尝试连接 server 并验证appName - config.setEnableTestMode(properties.isEnableTestMode()); + config.setEnableTestMode(worker.isEnableTestMode()); // 2. 创建 Worker 对象,设置配置文件 OhMyWorker ohMyWorker = new OhMyWorker(); ohMyWorker.setConfig(config); return ohMyWorker; } + + static class PowerJobWorkerCondition extends AnyNestedCondition { + + public PowerJobWorkerCondition() { + super(ConfigurationPhase.PARSE_CONFIGURATION); + } + + @Deprecated + @ConditionalOnProperty(prefix = "powerjob", name = "server-address") + static class PowerJobProperty { + + } + + @ConditionalOnProperty(prefix = "powerjob.worker", name = "server-address") + static class PowerJobWorkerProperty { + + } + } } diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java index b8cf30b9..480e9e8d 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java @@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.autoconfigure; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; /** * PowerJob 配置项 @@ -12,33 +14,114 @@ import org.springframework.boot.context.properties.ConfigurationProperties; * @author songyinyin * @since 2020/7/26 16:37 */ -@Data @ConfigurationProperties(prefix = "powerjob") public class PowerJobProperties { + + private final Worker worker = new Worker(); + + public Worker getWorker() { + return worker; + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.app-name") + public String getAppName() { + return getWorker().appName; + } + + @Deprecated + public void setAppName(String appName) { + getWorker().setAppName(appName); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.akka-port") + public int getAkkaPort() { + return getWorker().akkaPort; + } + + @Deprecated + public void setAkkaPort(int akkaPort) { + getWorker().setAkkaPort(akkaPort); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.server-address") + public String getServerAddress() { + return getWorker().serverAddress; + } + + @Deprecated + public void setServerAddress(String serverAddress) { + getWorker().setServerAddress(serverAddress); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.store-strategy") + public StoreStrategy getStoreStrategy() { + return getWorker().storeStrategy; + } + + @Deprecated + public void setStoreStrategy(StoreStrategy storeStrategy) { + getWorker().setStoreStrategy(storeStrategy); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.max-result-length") + public int getMaxResultLength() { + return getWorker().maxResultLength; + } + + @Deprecated + public void setMaxResultLength(int maxResultLength) { + getWorker().setMaxResultLength(maxResultLength); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.enable-test-mode") + public boolean isEnableTestMode() { + return getWorker().enableTestMode; + } + + @Deprecated + public void setEnableTestMode(boolean enableTestMode) { + getWorker().setEnableTestMode(enableTestMode); + } + + + /** - * 应用名称,需要提前在控制台注册,否则启动报错 + * 客户端 配置项 */ - private String appName; - /** - * 启动 akka 端口 - */ - private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT; - /** - * 调度服务器地址,ip:port 或 域名,多个用英文逗号分隔 - */ - private String serverAddress; - /** - * 本地持久化方式,默认使用磁盘 - */ - private StoreStrategy storeStrategy = StoreStrategy.DISK; - /** - * 最大返回值长度,超过会被截断 - * {@link ProcessResult}#msg 的最大长度 - */ - private int maxResultLength = 8096; - /** - * 启动测试模式,true情况下,不再尝试连接 server 并验证appName。 - * true -> 用于本地写单元测试调试; false -> 默认值,标准模式 - */ - private boolean enableTestMode = false; + @Setter + @Getter + public static class Worker { + /** + * 应用名称,需要提前在控制台注册,否则启动报错 + */ + private String appName; + /** + * 启动 akka 端口 + */ + private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT; + /** + * 调度服务器地址,ip:port 或 域名,多个用英文逗号分隔 + */ + private String serverAddress; + /** + * 本地持久化方式,默认使用磁盘 + */ + private StoreStrategy storeStrategy = StoreStrategy.DISK; + /** + * 最大返回值长度,超过会被截断 + * {@link ProcessResult}#msg 的最大长度 + */ + private int maxResultLength = 8096; + /** + * 启动测试模式,true情况下,不再尝试连接 server 并验证appName。 + * true -> 用于本地写单元测试调试; false -> 默认值,标准模式 + */ + private boolean enableTestMode = false; + } } diff --git a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json index 1fbedb0a..5fe74cec 100644 --- a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json @@ -4,46 +4,106 @@ "name": "powerjob", "type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + }, + { + "name": "powerjob.worker", + "type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "sourceMethod": "getWorker()" } ], "properties": [ { - "name": "powerjob.app-name", - "type": "java.lang.String", - "description": "应用名称,需要提前在控制台注册,否则启动报错", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + "name": "powerjob.worker.akka-port", + "type": "java.lang.Integer", + "description": "启动 akka 端口", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" }, { - "name": "powerjob.max-result-length", + "name": "powerjob.worker.app-name", + "type": "java.lang.String", + "description": "应用名称,需要提前在控制台注册,否则启动报错", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, + { + "name": "powerjob.worker.enable-test-mode", + "type": "java.lang.Boolean", + "description": "启动测试模式,true情况下,不再尝试连接 server 并验证appName。 true -> 用于本地写单元测试调试; false -> 默认值,标准模式", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker", + "defaultValue": false + }, + { + "name": "powerjob.worker.max-result-length", "type": "java.lang.Integer", "description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker", "defaultValue": 8096 }, + { + "name": "powerjob.worker.server-address", + "type": "java.lang.String", + "description": "调度服务器地址,ip:port 或 域名,多个用英文逗号分隔", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, + { + "name": "powerjob.worker.store-strategy", + "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", + "description": "本地持久化方式,默认使用磁盘", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, { "name": "powerjob.akka-port", "type": "java.lang.Integer", - "description": "启动 akka 端口", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.akka-port" + } }, { - "name": "powerjob.server-address", + "name": "powerjob.app-name", "type": "java.lang.String", - "description": "调度服务器地址,ip:port 或 域名,多值用英文逗号分隔", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" - }, - { - "name": "powerjob.store-strategy", - "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", - "description": "本地持久化方式,默认使用磁盘", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.app-name" + } }, { "name": "powerjob.enable-test-mode", "type": "java.lang.Boolean", - "description": "启动测试模式,true情况下,不再尝试连接 server 并验证appName。true -> 用于本地写单元测试调试; false -> 默认值,标准模式", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", - "defaultValue": false + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.enable-test-mode" + } + }, + { + "name": "powerjob.max-result-length", + "type": "java.lang.Integer", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.max-result-length" + } + }, + { + "name": "powerjob.server-address", + "type": "java.lang.String", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.server-address" + } + }, + { + "name": "powerjob.store-strategy", + "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.store-strategy" + } } ], "hints": [] From f36c06c26c06632ea86f4e822bd6fcb15f3b4a03 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 9 Aug 2020 23:36:37 +0800 Subject: [PATCH 03/17] [release] v3.2.3 --- .../src/main/resources/application.properties | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties index 7c3ff3fa..17bfa59f 100644 --- a/powerjob-worker-samples/src/main/resources/application.properties +++ b/powerjob-worker-samples/src/main/resources/application.properties @@ -2,7 +2,7 @@ server.port=8081 spring.jpa.open-in-view=false -########### powerjob-worker 配置 ########### +########### powerjob-worker 配置(老配置 powerjob.xxx 即将废弃,请使用 powerjob.worker.xxx) ########### # akka 工作端口,可选,默认 27777 powerjob.worker.akka-port=27777 # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称 @@ -10,4 +10,6 @@ powerjob.worker.app-name=powerjob-agent-test # 调度服务器地址,IP:Port 或 域名,多值逗号分隔 powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701 # 持久化方式,可选,默认 disk -powerjob.worker.store-strategy=disk \ No newline at end of file +powerjob.worker.store-strategy=disk +# 返回值最大长度,默认 8096 +powerjob.worker.max-result-length=4096 \ No newline at end of file From df1e5718dab262798e14238fe970555d9729a37b Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 10 Aug 2020 00:04:17 +0800 Subject: [PATCH 04/17] [fix] exclude log4j in powerjob-server to avoid compile error --- powerjob-server/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 9b2b2378..4a9e78a5 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -166,6 +166,12 @@ com.aliyun alibaba-dingtalk-service-sdk ${dingding.version} + + + log4j + log4j + + From 11c24f4fd6a80c2bada83bc02c158b55b0cb592f Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 16 Aug 2020 22:23:05 +0800 Subject: [PATCH 05/17] [docs] update readme.md --- README.md | 2 +- .../com/github/kfcfans/powerjob/server/service/JobService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 658653a3..936894e6 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为 | 在线任务治理 | 不支持 | 支持 | 支持 | **支持** | | 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | | 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | -| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** | +| 报警监控 | 无 | 邮件 | 短信 | **邮件与钉钉,并支持开发者扩展** | | 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | | DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 9b3810e8..9b8a874c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -70,7 +70,7 @@ public class JobService { jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); if (jobInfoDO.getMaxWorkerCount() == null) { - jobInfoDO.setMaxInstanceNum(0); + jobInfoDO.setMaxWorkerCount(0); } // 转化报警用户列表 From d5c26e70d8ac865810c060ed9d536d36b3f0b73c Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 10:45:57 +0800 Subject: [PATCH 06/17] feat: FrequentJob sort by subInstanceId #63 --- .../powerjob/worker/core/tracker/task/FrequentTaskTracker.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 00cd28cc..5f797461 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -123,6 +123,9 @@ public class FrequentTaskTracker extends TaskTracker { history.add(subDetail); }); + // 按 subInstanceId 排序 issue#63 + history.sort((o1, o2) -> (int) (o2.getSubInstanceId() - o1.getSubInstanceId())); + detail.setSubInstanceDetails(history); return detail; } From d13bbc77dce31f6984c905cb4da7f2f78c11030c Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 10:48:54 +0800 Subject: [PATCH 07/17] fix: InstanceDTO don't have wfInstanceId --- .../kfcfans/powerjob/common/response/InstanceInfoDTO.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java index 3f26e896..c95f25df 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java @@ -20,12 +20,16 @@ public class InstanceInfoDTO { private Long appId; // 任务实例ID private Long instanceId; + // 工作流实例ID + private Long wfInstanceId; // 任务实例参数 private String instanceParams; /** * 任务状态 {@link InstanceStatus} */ private int status; + // 该任务实例的类型,普通/工作流(InstanceType) + private Integer type; // 执行结果 private String result; // 预计触发时间 From 33588a6d05e42601c852280d8814825862a7eb78 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 10:52:26 +0800 Subject: [PATCH 08/17] fix: Oracle ZHS16GBK #72 --- powerjob-server/pom.xml | 5 +++++ powerjob-server/src/main/resources/logback-product.xml | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 4a9e78a5..a6434cec 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -57,6 +57,11 @@ ojdbc8 ${ojdbc.version} + + com.oracle.database.nls + orai18n + ${ojdbc.version} + com.microsoft.sqlserver diff --git a/powerjob-server/src/main/resources/logback-product.xml b/powerjob-server/src/main/resources/logback-product.xml index 4fc5ea1e..9f63badf 100644 --- a/powerjob-server/src/main/resources/logback-product.xml +++ b/powerjob-server/src/main/resources/logback-product.xml @@ -8,6 +8,13 @@ --> + + + ${CONSOLE_LOG_PATTERN} + utf8 + + + ${LOG_PATH}/powerjob-server-error.log @@ -61,6 +68,7 @@ + From 3a32eaea04cfe8e35d2b8598fccb51a40e8da60e Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 11:02:23 +0800 Subject: [PATCH 09/17] feat: modify banner --- powerjob-server/src/main/resources/banner.txt | 3 ++- .../worker/common/OmsBannerPrinter.java | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/powerjob-server/src/main/resources/banner.txt b/powerjob-server/src/main/resources/banner.txt index dffacc32..0d73c6d3 100644 --- a/powerjob-server/src/main/resources/banner.txt +++ b/powerjob-server/src/main/resources/banner.txt @@ -8,6 +8,7 @@ ${AnsiColor.GREEN} ░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ${AnsiColor.BRIGHT_RED} -* Maintainer: tengjiqi@gmail.com +* Maintainer: tengjiqi@gmail.com & PowerJob-Team +* OfficialWebsite: http://www.powerjob.tech/ * SourceCode: https://github.com/KFCFans/PowerJob * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java index 1b1daf6b..227d5bec 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java @@ -11,21 +11,28 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public final class OmsBannerPrinter { - private static final String BANNER = "\n ███████ ██ ██ \n" + - "░██░░░░██ ░██ ░██ \n" + - "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██ \n" + - "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████ \n" + + private static final String BANNER = "" + + "\n" + + " ███████ ██ ██\n" + + "░██░░░░██ ░██ ░██\n" + + "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██\n" + + "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████\n" + "░██░░░░ ░██ ░██ ░██ ███░██░███████ ░██ ░ ░██░██ ░██░██░░░██\n" + "░██ ░██ ░██ ░████░████░██░░░░ ░██ ██ ░██░██ ░██░██ ░██\n" + - "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ \n" + - "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ \n"; + "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████\n" + + "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░\n" + + "\n" + + "* Maintainer: tengjiqi@gmail.com & PowerJob-Team\n" + + "* OfficialWebsite: http://www.powerjob.tech/\n" + + "* SourceCode: https://github.com/KFCFans/PowerJob\n" + + "\n"; public static void print() { log.info(BANNER); String version = OmsWorkerVersion.getVersion(); version = (version != null) ? " (v" + version + ")" : ""; - log.info(":: OhMyScheduler Worker :: {}", version); + log.info(":: PowerJob Worker :: {}", version); } } From 92b9f9668fce1507796061116db87d8026bf8ed9 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 11:12:17 +0800 Subject: [PATCH 10/17] feat: change pom from 3.2.3 to 3.3.0, It's time to fix dependency problem --- powerjob-client/pom.xml | 4 ++-- powerjob-common/pom.xml | 2 +- powerjob-server/pom.xml | 4 ++-- powerjob-worker-agent/pom.xml | 4 ++-- powerjob-worker-samples/pom.xml | 4 ++-- powerjob-worker-spring-boot-starter/pom.xml | 4 ++-- powerjob-worker/pom.xml | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index d1e080d8..76a28752 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-client - 3.2.3 + 3.3.0 jar - 3.2.3 + 3.3.0 5.6.1 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 8aec6af7..1f360c10 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.2.3 + 3.3.0 jar diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index a6434cec..672af6e2 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.2.3 + 3.3.0 jar 2.9.2 2.2.6.RELEASE - 3.2.3 + 3.3.0 8.0.19 19.7.0.0 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 68ee9f0c..d95e203f 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.2.3 + 3.3.0 jar - 3.2.3 + 3.3.0 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 80a5d7e2..86835a94 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.2.3 + 3.3.0 2.2.6.RELEASE - 3.2.3 + 3.3.0 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 60a42a73..d5ba6e8e 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.2.3 + 3.3.0 jar - 3.2.3 + 3.3.0 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index e0ac3936..44d95e5d 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.2.3 + 3.3.0 jar 5.2.4.RELEASE - 3.2.3 + 3.3.0 1.4.200 3.4.2 5.6.1 From caebb2d87a687a02c1adf1dd806aae67b43e976e Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 12:24:32 +0800 Subject: [PATCH 11/17] refacotr: okhttp v4 -> v3 --- .../java/com/github/kfcfans/powerjob/client/OhMyClient.java | 4 ++-- powerjob-common/pom.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 5a573cbb..62fa3179 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -108,7 +108,7 @@ public class OhMyClient { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JsonUtils.toJSONStringUnsafe(request); - String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType)); + String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -283,7 +283,7 @@ public class OhMyClient { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JsonUtils.toJSONStringUnsafe(request); - String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType)); + String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); return JsonUtils.parseObject(post, ResultDTO.class); } diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 1f360c10..947b04b9 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -18,7 +18,7 @@ 3.10 2.6 29.0-jre - 4.4.1 + 3.14.9 2.6.4 5.6.1 From 8361ddc4e514a9e9a270e3a0aa79a963fb3b360d Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 12:48:44 +0800 Subject: [PATCH 12/17] refactor: OmsException -> PowerJobException --- .../kfcfans/powerjob/client/OhMyClient.java | 8 ++--- .../kfcfans/powerjob/common/OmsException.java | 29 ------------------- .../powerjob/common/PowerJobException.java | 29 +++++++++++++++++++ .../powerjob/common/utils/CommonUtils.java | 6 ++-- .../powerjob/common/utils/JsonUtils.java | 4 +-- .../server/common/utils/DingTalkUtils.java | 6 ++-- .../server/common/utils/WorkflowDAGUtils.java | 8 ++--- .../server/service/AppInfoService.java | 6 ++-- .../alarm/impl/DingTalkAlarmService.java | 4 +-- .../service/ha/ServerSelectService.java | 4 +-- .../service/instance/InstanceService.java | 10 +++---- .../workflow/WorkflowInstanceService.java | 6 ++-- .../service/workflow/WorkflowService.java | 6 ++-- .../web/ControllerExceptionHandler.java | 4 +-- .../web/request/ModifyAppInfoRequest.java | 4 +-- .../kfcfans/powerjob/worker/OhMyWorker.java | 8 ++--- .../worker/container/OmsJarContainer.java | 6 ++-- .../tracker/processor/ProcessorTracker.java | 4 +-- .../core/tracker/task/CommonTaskTracker.java | 2 +- .../tracker/task/FrequentTaskTracker.java | 2 +- 20 files changed, 78 insertions(+), 78 deletions(-) delete mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java create mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 62fa3179..e2c9d25f 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.client; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.OpenAPIConstant; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; @@ -68,7 +68,7 @@ public class OhMyClient { currentAddress = addr; break; }else { - throw new OmsException(resultDTO.getMessage()); + throw new PowerJobException(resultDTO.getMessage()); } } }catch (IOException ignore) { @@ -76,7 +76,7 @@ public class OhMyClient { } if (StringUtils.isEmpty(currentAddress)) { - throw new OmsException("no server available"); + throw new PowerJobException("no server available"); } log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress); } @@ -426,6 +426,6 @@ public class OhMyClient { } log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress); - throw new OmsException("no server available when send post"); + throw new PowerJobException("no server available when send post"); } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java deleted file mode 100644 index 3d7db8c3..00000000 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.kfcfans.powerjob.common; - -/** - * OhMyScheduler 运行时异常 - * - * @author tjq - * @since 2020/5/26 - */ -public class OmsException extends RuntimeException { - - public OmsException() { - } - - public OmsException(String message) { - super(message); - } - - public OmsException(String message, Throwable cause) { - super(message, cause); - } - - public OmsException(Throwable cause) { - super(cause); - } - - public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java new file mode 100644 index 00000000..5d8aba04 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java @@ -0,0 +1,29 @@ +package com.github.kfcfans.powerjob.common; + +/** + * PowerJob 运行时异常 + * + * @author tjq + * @since 2020/5/26 + */ +public class PowerJobException extends RuntimeException { + + public PowerJobException() { + } + + public PowerJobException(String message) { + super(message); + } + + public PowerJobException(String message, Throwable cause) { + super(message, cause); + } + + public PowerJobException(Throwable cause) { + super(cause); + } + + public PowerJobException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java index 08da3c00..78c1973a 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.common.utils; import com.github.kfcfans.powerjob.common.OmsConstant; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -121,11 +121,11 @@ public class CommonUtils { public static T requireNonNull(T obj, String msg) { if (obj == null) { - throw new OmsException(msg); + throw new PowerJobException(msg); } if (obj instanceof String) { if (StringUtils.isEmpty((String) obj)) { - throw new OmsException(msg); + throw new PowerJobException(msg); } } return obj; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java index 4cf47bfc..3d0a61ae 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java @@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.common.utils; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import org.apache.commons.lang3.exception.ExceptionUtils; /** @@ -54,6 +54,6 @@ public class JsonUtils { }catch (Exception e) { ExceptionUtils.rethrow(e); } - throw new OmsException("impossible"); + throw new PowerJobException("impossible"); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java index 366800d6..6196d1cb 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java @@ -7,7 +7,7 @@ import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request; import com.dingtalk.api.request.OapiUserGetByMobileRequest; import com.dingtalk.api.response.OapiGettokenResponse; import com.dingtalk.api.response.OapiUserGetByMobileResponse; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -52,7 +52,7 @@ public class DingTalkUtils implements Closeable { refreshAccessToken(appKey, appSecret); if (StringUtils.isEmpty(accessToken)) { - throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret"); + throw new PowerJobException("fetch AccessToken failed, please check your appKey & appSecret"); } scheduledPool = Executors.newSingleThreadScheduledExecutor(); @@ -91,7 +91,7 @@ public class DingTalkUtils implements Closeable { return execute.getUserid(); } log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg()); - throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); + throw new PowerJobException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); } public void sendMarkdownAsync(String title, List entities, String userList, Long agentId) throws Exception { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java index e96a913a..2a98e9f9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.common.utils; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.model.WorkflowDAG; @@ -76,7 +76,7 @@ public class WorkflowDAGUtils { Map id2Node = Maps.newHashMap(); if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) { - throw new OmsException("empty graph"); + throw new PowerJobException("empty graph"); } // 创建节点 @@ -95,7 +95,7 @@ public class WorkflowDAGUtils { WorkflowDAG.Node to = id2Node.get(edge.getTo()); if (from == null || to == null) { - throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge)); + throw new PowerJobException("Illegal Edge: " + JsonUtils.toJSONString(edge)); } from.getSuccessors().add(to); @@ -106,7 +106,7 @@ public class WorkflowDAGUtils { // 合法性校验(至少存在一个顶点) if (rootIds.size() < 1) { - throw new OmsException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG)); + throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG)); } List roots = Lists.newLinkedList(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java index 2b0ec22a..2f8b0bb3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.service; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import org.springframework.stereotype.Service; @@ -28,11 +28,11 @@ public class AppInfoService { */ public Long assertApp(String appName, String password) { - AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName)); + AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName)); if (Objects.equals(appInfo.getPassword(), password)) { return appInfo.getId(); } - throw new OmsException("password error!"); + throw new PowerJobException("password error!"); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java index eedbf5ac..aa01d08f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.service.alarm.impl; import com.github.kfcfans.powerjob.common.OmsConstant; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.SJ; @@ -55,7 +55,7 @@ public class DingTalkAlarmService implements Alarmable { String userId = mobile2UserIdCache.get(user.getPhone(), () -> { try { return dingTalkUtils.fetchUserIdByMobile(user.getPhone()); - } catch (OmsException ignore) { + } catch (PowerJobException ignore) { return EMPTY_TAG; } catch (Exception ignore) { return null; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java index e9037176..e82760b8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.ha; import akka.actor.ActorSelection; import akka.pattern.Patterns; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.requests.Ping; @@ -56,7 +56,7 @@ public class ServerSelectService { // 无锁获取当前数据库中的Server Optional appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { - throw new OmsException(appId + " is not registered!"); + throw new PowerJobException(appId + " is not registered!"); } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index a9ebc414..bab39209 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance; import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.model.InstanceDetail; @@ -135,11 +135,11 @@ public class InstanceService { public void retryInstance(Long instanceId) { InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) { - throw new OmsException("Only stopped instance can be retry!"); + throw new PowerJobException("Only stopped instance can be retry!"); } // 暂时不支持工作流任务的重试 if (instanceInfo.getWfInstanceId() != null) { - throw new OmsException("Workflow's instance do not support retry!"); + throw new PowerJobException("Workflow's instance do not support retry!"); } instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); @@ -152,7 +152,7 @@ public class InstanceService { // 派发任务 Long jobId = instanceInfo.getJobId(); - JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new OmsException("can't find job info by jobId: " + jobId)); + JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId)); dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); } @@ -187,7 +187,7 @@ public class InstanceService { log.info("[Instance-{}] cancel the instance successfully.", instanceId); }else { log.warn("[Instance-{}] cancel the instance failed.", instanceId); - throw new OmsException("instance already up and running"); + throw new PowerJobException("instance already up and running"); } }catch (Exception e) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index 08a20494..3be885c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.workflow; import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; @@ -43,7 +43,7 @@ public class WorkflowInstanceService { public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { - throw new OmsException("workflow instance already stopped"); + throw new PowerJobException("workflow instance already stopped"); } // 停止所有已启动且未完成的服务 PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); @@ -80,7 +80,7 @@ public class WorkflowInstanceService { public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); if (!Objects.equals(appId, wfInstance.getAppId())) { - throw new OmsException("Permission Denied!"); + throw new PowerJobException("Permission Denied!"); } return wfInstance; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 63e224b5..a2e0a78d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.service.workflow; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO; @@ -42,7 +42,7 @@ public class WorkflowService { req.valid(); if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) { - throw new OmsException("illegal DAG"); + throw new PowerJobException("illegal DAG"); } Long wfId = req.getId(); @@ -145,7 +145,7 @@ public class WorkflowService { private WorkflowInfoDO permissionCheck(Long wfId, Long appId) { WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId)); if (!wfInfo.getAppId().equals(appId)) { - throw new OmsException("Permission Denied!can't delete other appId's workflow!"); + throw new PowerJobException("Permission Denied!can't delete other appId's workflow!"); } return wfInfo; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java index 3ec9821d..54c3e75e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.web; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.ResultDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.http.converter.HttpMessageNotReadableException; @@ -25,7 +25,7 @@ public class ControllerExceptionHandler { public ResultDTO exceptionHandler(Exception e) { // 不是所有异常都需要打印完整堆栈,后续可以定义内部的Exception,便于判断 - if (e instanceof IllegalArgumentException || e instanceof OmsException) { + if (e instanceof IllegalArgumentException || e instanceof PowerJobException) { log.warn("[ControllerException] http request failed, message is {}.", e.getMessage()); } else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) { log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java index 77ea5da3..bd007626 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.web.request; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import lombok.Data; import org.apache.commons.lang3.StringUtils; @@ -21,7 +21,7 @@ public class ModifyAppInfoRequest { public void valid() { CommonUtils.requireNonNull(appName, "appName can't be empty"); if (StringUtils.containsWhitespace(appName)) { - throw new OmsException("appName can't contains white space!"); + throw new PowerJobException("appName can't contains white space!"); } } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 3a78f2f9..1cde2378 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -5,7 +5,7 @@ import akka.actor.ActorSystem; import akka.actor.DeadLetter; import akka.actor.Props; import akka.routing.RoundRobinPool; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.CommonUtils; @@ -163,16 +163,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di return appId; }else { log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); - throw new OmsException(resultDTO.getMessage()); + throw new PowerJobException(resultDTO.getMessage()); } - }catch (OmsException oe) { + }catch (PowerJobException oe) { throw oe; }catch (Exception ignore) { log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl); } } log.error("[OhMyWorker] no available server in {}.", config.getServerAddress()); - throw new OmsException("no server available!"); + throw new PowerJobException("no server available!"); } @Override diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java index 6dfacce6..8efe4adf 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.worker.container; import com.github.kfcfans.powerjob.common.ContainerConstant; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; @@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer { if (propertiesURLStream == null) { log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); - throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); + throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); } properties.load(propertiesURLStream); @@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer { String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY); if (StringUtils.isEmpty(packageName)) { log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId); - throw new OmsException("invalid jar file"); + throw new PowerJobException("invalid jar file"); } // 加载用户类 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index e9957127..38d4937f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -318,12 +318,12 @@ public class ProcessorTracker { break; default: log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType); - throw new OmsException("unknown processor type of " + processorType); + throw new PowerJobException("unknown processor type of " + processorType); } if (processor == null) { log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); - throw new OmsException("fetch Processor failed"); + throw new PowerJobException("fetch Processor failed"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 01e1f9ac..e3f6a08e 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -110,7 +110,7 @@ public class CommonTaskTracker extends TaskTracker { log.info("[TaskTracker-{}] create root task successfully.", instanceId); }else { log.error("[TaskTracker-{}] create root task failed.", instanceId); - throw new OmsException("create root task failed for instance: " + instanceId); + throw new PowerJobException("create root task failed for instance: " + instanceId); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 5f797461..f56e9a62 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -91,7 +91,7 @@ public class FrequentTaskTracker extends TaskTracker { if (timeExpressionType == TimeExpressionType.FIX_RATE) { // 固定频率需要设置最小间隔 if (timeParams < MIN_INTERVAL) { - throw new OmsException("time interval too small, please set the timeExpressionInfo >= 1000"); + throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000"); } scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS); }else { From a138e05404b3ba20884d9264e814fa46bfe220b6 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 13:31:58 +0800 Subject: [PATCH 13/17] feat: add Workflow initPrams #58 --- .../kfcfans/powerjob/client/OhMyClient.java | 17 +++++++++++++---- .../core/model/WorkflowInstanceInfoDO.java | 5 +++++ .../timing/InstanceStatusCheckService.java | 2 +- .../timing/schedule/OmsScheduleService.java | 4 ++-- .../workflow/WorkflowInstanceManager.java | 17 +++++++++++++---- .../service/workflow/WorkflowService.java | 14 ++++++++++---- .../web/controller/OpenAPIController.java | 4 ++-- .../web/controller/WorkflowController.java | 2 +- 8 files changed, 47 insertions(+), 18 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index e2c9d25f..600f49a9 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -349,17 +349,26 @@ public class OhMyClient { /** * 运行工作流 - * @param workflowId workflowId + * @param workflowId 工作流ID + * @param initParams 启动参数 + * @param delay 延迟时间,单位 MS * @return 工作流实例ID - * @throws Exception 异常 + * @throws Exception 异常信息 */ - public ResultDTO runWorkflow(Long workflowId) throws Exception { + public ResultDTO runWorkflow(Long workflowId, String initParams, long delay) throws Exception { FormBody.Builder builder = new FormBody.Builder() .add("workflowId", workflowId.toString()) - .add("appId", appId.toString()); + .add("appId", appId.toString()) + .add("delay", String.valueOf(delay)); + if (StringUtils.isNotEmpty(initParams)) { + builder.add("initParams", initParams); + } String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); return JsonUtils.parseObject(post, ResultDTO.class); } + public ResultDTO runWorkflow(Long workflowId) throws Exception { + return runWorkflow(workflowId, null, 0); + } /* ************* Workflow Instance 区 ************* */ /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index 5eacf323..c2a074a4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -36,6 +36,11 @@ public class WorkflowInstanceInfoDO { // workflow 状态(WorkflowInstanceStatus) private Integer status; + // 工作流启动参数 + @Lob + @Column + private String wfInitParams; + @Lob @Column private String dag; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index b846ccbc..489f2874 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -171,7 +171,7 @@ public class InstanceStatusCheckService { waitingWfInstanceList.forEach(wfInstance -> { Optional workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId()); workflowOpt.ifPresent(workflowInfo -> { - workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId()); + workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams()); log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId()); }); }); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 6e09a62a..3da63e5d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -207,7 +207,7 @@ public class OmsScheduleService { wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 - Long wfInstanceId = workflowInstanceManager.create(wfInfo); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, null); // 2. 推入时间轮,准备调度执行 long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis(); @@ -215,7 +215,7 @@ public class OmsScheduleService { log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis()); delay = 0; } - InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId)); + InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null)); // 3. 重新计算下一次调度时间并更新 try { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index d9e7fab2..37ca9396 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -67,9 +67,10 @@ public class WorkflowInstanceManager { /** * 创建工作流任务实例 * @param wfInfo 工作流任务元数据(描述信息) + * @param initParams 启动参数 * @return wfInstanceId */ - public Long create(WorkflowInfoDO wfInfo) { + public Long create(WorkflowInfoDO wfInfo, String initParams) { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); @@ -82,6 +83,7 @@ public class WorkflowInstanceManager { newWfInstance.setWorkflowId(wfId); newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); newWfInstance.setActualTriggerTime(System.currentTimeMillis()); + newWfInstance.setWfInitParams(initParams); newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); @@ -107,8 +109,9 @@ public class WorkflowInstanceManager { * 开始任务 * @param wfInfo 工作流任务信息 * @param wfInstanceId 工作流任务实例ID + * @param initParams 启动参数 */ - public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) { + public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); if (!wfInstanceInfoOpt.isPresent()) { @@ -132,13 +135,19 @@ public class WorkflowInstanceManager { try { + // 构建根任务启动参数(为了精简 worker 端实现,启动参数仍以 instanceParams 字段承接) + Map preJobId2Result = Maps.newHashMap(); + // 模拟 preJobId -> preJobResult 的格式,-1 代表前置任务不存在 + preJobId2Result.put("-1", initParams); + String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result); + PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); List roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); // 创建所有的根任务 roots.forEach(root -> { - Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); + Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis()); root.setInstanceId(instanceId); root.setStatus(InstanceStatus.RUNNING.getV()); @@ -152,7 +161,7 @@ public class WorkflowInstanceManager { log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null)); + roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams)); }catch (Exception e) { log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index a2e0a78d..073d7491 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; +import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -130,15 +131,20 @@ public class WorkflowService { * 立即运行工作流 * @param wfId 工作流ID * @param appId 所属应用ID + * @param initParams 启动参数 + * @param delay 延迟时间 * @return 该 workflow 实例的 instanceId(wfInstanceId) */ - public Long runWorkflow(Long wfId, Long appId) { + public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); - Long wfInstanceId = workflowInstanceManager.create(wfInfo); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); - // 正式启动任务 - workflowInstanceManager.start(wfInfo, wfInstanceId); + if (delay <= 0) { + workflowInstanceManager.start(wfInfo, wfInstanceId, initParams); + }else { + InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams)); + } return wfInstanceId; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index 40fd0b6a..a846f73c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -150,8 +150,8 @@ public class OpenAPIController { } @PostMapping(OpenAPIConstant.RUN_WORKFLOW) - public ResultDTO runWorkflow(Long workflowId, Long appId) { - return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); + public ResultDTO runWorkflow(Long workflowId, Long appId, @RequestParam(required = false) String initParams, @RequestParam(required = false) Long delay) { + return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay == null ? 0 : delay)); } /* ************* Workflow Instance 区 ************* */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java index 25095922..9eb1b589 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java @@ -79,7 +79,7 @@ public class WorkflowController { @GetMapping("/run") public ResultDTO runWorkflow(Long workflowId, Long appId) { - return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); + return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0)); } private static PageResult convertPage(Page originPage) { From fb61ad0bc65ffb333da64efc911773bf9ffb5fb1 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 11:19:24 +0800 Subject: [PATCH 14/17] fix: fixed-cron-expression schedule #64 --- .../powerjob/server/service/JobService.java | 10 ++- .../timing/schedule/OmsScheduleService.java | 63 +++++++++++-------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 9b8a874c..9ae00a1c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; @@ -78,7 +79,7 @@ public class JobService { jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds())); } - refreshJob(jobInfoDO); + calculateNextTriggerTime(jobInfoDO); if (request.getId() == null) { jobInfoDO.setGmtCreate(new Date()); } @@ -143,7 +144,7 @@ public class JobService { JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); - refreshJob(jobInfoDO); + calculateNextTriggerTime(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO); } @@ -184,7 +185,7 @@ public class JobService { }); } - private void refreshJob(JobInfoDO jobInfoDO) throws Exception { + private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception { // 计算下次调度时间 Date now = new Date(); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); @@ -192,6 +193,9 @@ public class JobService { if (timeExpressionType == TimeExpressionType.CRON) { CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); Date nextValidTime = cronExpression.getNextValidTimeAfter(now); + if (nextValidTime == null) { + throw new PowerJobException("invalid cron expression: " + jobInfoDO.getTimeExpression()); + } jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { jobInfoDO.setTimeExpression(null); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 3da63e5d..2115a664 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -121,8 +121,6 @@ public class OmsScheduleService { */ private void scheduleCronJob(List appIds) { - - Date now = new Date(); long nowTime = System.currentTimeMillis(); long timeThreshold = nowTime + 2 * SCHEDULE_RATE; Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { @@ -165,24 +163,13 @@ public class OmsScheduleService { }); // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) - List updatedJobInfos = Lists.newLinkedList(); jobInfos.forEach(jobInfoDO -> { - try { - - Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression()); - - JobInfoDO updatedJobInfo = new JobInfoDO(); - BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); - updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); - updatedJobInfo.setGmtModified(now); - - updatedJobInfos.add(updatedJobInfo); + refreshJob(jobInfoDO); } catch (Exception e) { - log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e); + log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e); } }); - jobInfoRepository.saveAll(updatedJobInfos); jobInfoRepository.flush(); @@ -203,7 +190,6 @@ public class OmsScheduleService { return; } - Date now = new Date(); wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 @@ -219,16 +205,9 @@ public class OmsScheduleService { // 3. 重新计算下一次调度时间并更新 try { - Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); - - WorkflowInfoDO updateEntity = new WorkflowInfoDO(); - BeanUtils.copyProperties(wfInfo, updateEntity); - - updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); - updateEntity.setGmtModified(now); - workflowInfoRepository.save(updateEntity); + refreshWorkflow(wfInfo); }catch (Exception e) { - log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e); + log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e); } }); workflowInfoRepository.flush(); @@ -264,6 +243,40 @@ public class OmsScheduleService { }); } + private void refreshJob(JobInfoDO jobInfo) throws Exception { + Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); + + JobInfoDO updatedJobInfo = new JobInfoDO(); + BeanUtils.copyProperties(jobInfo, updatedJobInfo); + + if (nextTriggerTime == null) { + log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId()); + updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV()); + }else { + updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); + } + updatedJobInfo.setGmtModified(new Date()); + + jobInfoRepository.save(updatedJobInfo); + } + + private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception { + Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); + + WorkflowInfoDO updateEntity = new WorkflowInfoDO(); + BeanUtils.copyProperties(wfInfo, updateEntity); + + if (nextTriggerTime == null) { + log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId()); + wfInfo.setStatus(SwitchableStatus.DISABLE.getV()); + }else { + updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); + } + + updateEntity.setGmtModified(new Date()); + workflowInfoRepository.save(updateEntity); + } + /** * 计算下次触发时间 * @param preTriggerTime 前一次触发时间 From 8560112432b3bcc7bb0aa83c30080ad49096322c Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 12:40:10 +0800 Subject: [PATCH 15/17] feat: update sql for new DO --- others/oms-sql.sql | 35 ++++++++++--------- .../src/test/java/TestWorkflow.java | 32 +++++++++++++++-- .../resources/application-daily.properties | 2 +- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/others/oms-sql.sql b/others/oms-sql.sql index 332cd8ee..1a934017 100644 --- a/others/oms-sql.sql +++ b/others/oms-sql.sql @@ -1,15 +1,17 @@ /* Navicat Premium Data Transfer + Source Server : Local MySQL Source Server Type : MySQL - Source Server Version : 80020 - Source Schema : powerjob-product + Source Server Version : 80021 + Source Host : localhost:3306 + Source Schema : powerjob-daily Target Server Type : MySQL - Target Server Version : 80020 + Target Server Version : 80021 File Encoding : 65001 - Date: 23/06/2020 22:30:06 + Date: 08/10/2020 12:39:10 */ SET NAMES utf8mb4; @@ -28,7 +30,7 @@ CREATE TABLE `app_info` ( `password` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `appNameUK` (`app_name`) -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for container_info @@ -62,10 +64,10 @@ CREATE TABLE `instance_info` ( `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, `instance_id` bigint DEFAULT NULL, - `instance_params` text, + `instance_params` longtext, `job_id` bigint DEFAULT NULL, `last_report_time` bigint DEFAULT NULL, - `result` text, + `result` longtext, `running_times` bigint DEFAULT NULL, `status` int DEFAULT NULL, `task_tracker_address` varchar(255) DEFAULT NULL, @@ -75,7 +77,7 @@ CREATE TABLE `instance_info` ( KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`), KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`), KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for job_info @@ -101,7 +103,7 @@ CREATE TABLE `job_info` ( `min_memory_space` double NOT NULL, `next_trigger_time` bigint DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL, - `processor_info` text, + `processor_info` longtext, `processor_type` int DEFAULT NULL, `status` int DEFAULT NULL, `task_retry_num` int DEFAULT NULL, @@ -109,7 +111,7 @@ CREATE TABLE `job_info` ( `time_expression_type` int DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for oms_lock @@ -124,7 +126,7 @@ CREATE TABLE `oms_lock` ( `ownerip` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `lockNameUK` (`lock_name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for server_info @@ -166,7 +168,7 @@ CREATE TABLE `workflow_info` ( `max_wf_instance_num` int DEFAULT NULL, `next_trigger_time` bigint DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL, - `pedag` text, + `pedag` longtext, `status` int DEFAULT NULL, `time_expression` varchar(255) DEFAULT NULL, `time_expression_type` int DEFAULT NULL, @@ -174,7 +176,7 @@ CREATE TABLE `workflow_info` ( `wf_name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for workflow_instance_info @@ -184,15 +186,16 @@ CREATE TABLE `workflow_instance_info` ( `id` bigint NOT NULL AUTO_INCREMENT, `actual_trigger_time` bigint DEFAULT NULL, `app_id` bigint DEFAULT NULL, - `dag` text, + `dag` longtext, `finished_time` bigint DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, - `result` text, + `result` longtext, `status` int DEFAULT NULL, + `wf_init_params` longtext, `wf_instance_id` bigint DEFAULT NULL, `workflow_id` bigint DEFAULT NULL, PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; SET FOREIGN_KEY_CHECKS = 1; diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index 3b5f2f75..c99efc0e 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -1,7 +1,11 @@ import com.github.kfcfans.powerjob.client.OhMyClient; +import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; +import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -20,7 +24,23 @@ public class TestWorkflow { @BeforeAll public static void initClient() throws Exception { - ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null); + ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); + } + + @Test + public void initTestData() throws Exception { + SaveJobInfoRequest base = new SaveJobInfoRequest(); + base.setJobName("DAG-Node-"); + base.setTimeExpressionType(TimeExpressionType.WORKFLOW); + base.setExecuteType(ExecuteType.STANDALONE); + base.setProcessorType(ProcessorType.EMBEDDED_JAVA); + base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor"); + + for (int i = 0; i < 5; i++) { + SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class); + request.setJobName(request.getJobName() + i); + System.out.println(ohMyClient.saveJob(request)); + } } @Test @@ -30,8 +50,8 @@ public class TestWorkflow { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, "node-1")); - nodes.add(new PEWorkflowDAG.Node(2L, "node-2")); + nodes.add(new PEWorkflowDAG.Node(1L, "DAG-Node-1")); + nodes.add(new PEWorkflowDAG.Node(2L, "DAG-Node-2")); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); @@ -81,4 +101,10 @@ public class TestWorkflow { public void testFetchWfInstanceInfo() throws Exception { System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L)); } + + @Test + public void testRunWorkflowPlus() throws Exception { + System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 1", 0)); + System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 30)); + } } diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index 5eca462b..6d83511f 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 ####### oms.mongodb.enable=true -spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily +spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily ####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) ####### spring.mail.host=smtp.163.com From 763e416be3d2ed59daf026dcb8a8f5ce35e0ddc9 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 12:55:20 +0800 Subject: [PATCH 16/17] feat: optimize out of data's cron expression and passed the test --- .../powerjob/server/service/JobService.java | 2 +- .../web/controller/InstanceController.java | 5 ++-- .../powerjob/server/test/CronTest.java | 26 +++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 9ae00a1c..ad81184f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -194,7 +194,7 @@ public class JobService { CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); Date nextValidTime = cronExpression.getNextValidTimeAfter(now); if (nextValidTime == null) { - throw new PowerJobException("invalid cron expression: " + jobInfoDO.getTimeExpression()); + throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression()); } jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index 6f6f389c..300b55db 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; @@ -145,10 +146,10 @@ public class InstanceController { private String getTargetServer(Long instanceId) { InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); if (instanceInfo == null) { - throw new RuntimeException("invalid instanceId: " + instanceId); + throw new PowerJobException("invalid instanceId: " + instanceId); } Optional appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId()); - return appInfoOpt.orElseThrow(() -> new RuntimeException("impossible")).getCurrentServer(); + return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer(); } } diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java new file mode 100644 index 00000000..98316099 --- /dev/null +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java @@ -0,0 +1,26 @@ +package com.github.kfcfans.powerjob.server.test; + + +import com.github.kfcfans.powerjob.server.common.utils.CronExpression; +import org.junit.Test; + +import java.util.Date; + +/** + * CRON 测试 + * + * @author tjq + * @since 2020/10/8 + */ +public class CronTest { + + private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020"; + + @Test + public void testFixedTimeCron() throws Exception { + CronExpression cronExpression = new CronExpression(FIXED_CRON); + System.out.println(cronExpression.getCronExpression()); + System.out.println(cronExpression.getNextValidTimeAfter(new Date())); + } + +} From 71dbe05647330b509c7bd50592f6a2f20e3d5328 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 13:10:47 +0800 Subject: [PATCH 17/17] feat: optimize workflow OpenAPI and passed the test --- .../java/com/github/kfcfans/powerjob/client/OhMyClient.java | 6 +++--- powerjob-client/src/test/java/TestWorkflow.java | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 600f49a9..a2f91a13 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -351,15 +351,15 @@ public class OhMyClient { * 运行工作流 * @param workflowId 工作流ID * @param initParams 启动参数 - * @param delay 延迟时间,单位 MS + * @param delayMS 延迟时间,单位毫秒 ms * @return 工作流实例ID * @throws Exception 异常信息 */ - public ResultDTO runWorkflow(Long workflowId, String initParams, long delay) throws Exception { + public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception { FormBody.Builder builder = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) - .add("delay", String.valueOf(delay)); + .add("delay", String.valueOf(delayMS)); if (StringUtils.isNotEmpty(initParams)) { builder.add("initParams", initParams); } diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index c99efc0e..0e888742 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -104,7 +104,6 @@ public class TestWorkflow { @Test public void testRunWorkflowPlus() throws Exception { - System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 1", 0)); - System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 30)); + System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000)); } }