From d5c26e70d8ac865810c060ed9d536d36b3f0b73c Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 10:45:57 +0800 Subject: [PATCH 01/20] feat: FrequentJob sort by subInstanceId #63 --- .../powerjob/worker/core/tracker/task/FrequentTaskTracker.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 00cd28cc..5f797461 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -123,6 +123,9 @@ public class FrequentTaskTracker extends TaskTracker { history.add(subDetail); }); + // 按 subInstanceId 排序 issue#63 + history.sort((o1, o2) -> (int) (o2.getSubInstanceId() - o1.getSubInstanceId())); + detail.setSubInstanceDetails(history); return detail; } From d13bbc77dce31f6984c905cb4da7f2f78c11030c Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 10:48:54 +0800 Subject: [PATCH 02/20] fix: InstanceDTO don't have wfInstanceId --- .../kfcfans/powerjob/common/response/InstanceInfoDTO.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java index 3f26e896..c95f25df 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java @@ -20,12 +20,16 @@ public class InstanceInfoDTO { private Long appId; // 任务实例ID private Long instanceId; + // 工作流实例ID + private Long wfInstanceId; // 任务实例参数 private String instanceParams; /** * 任务状态 {@link InstanceStatus} */ private int status; + // 该任务实例的类型,普通/工作流(InstanceType) + private Integer type; // 执行结果 private String result; // 预计触发时间 From 33588a6d05e42601c852280d8814825862a7eb78 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 10:52:26 +0800 Subject: [PATCH 03/20] fix: Oracle ZHS16GBK #72 --- powerjob-server/pom.xml | 5 +++++ powerjob-server/src/main/resources/logback-product.xml | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 4a9e78a5..a6434cec 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -57,6 +57,11 @@ ojdbc8 ${ojdbc.version} + + com.oracle.database.nls + orai18n + ${ojdbc.version} + com.microsoft.sqlserver diff --git a/powerjob-server/src/main/resources/logback-product.xml b/powerjob-server/src/main/resources/logback-product.xml index 4fc5ea1e..9f63badf 100644 --- a/powerjob-server/src/main/resources/logback-product.xml +++ b/powerjob-server/src/main/resources/logback-product.xml @@ -8,6 +8,13 @@ --> + + + ${CONSOLE_LOG_PATTERN} + utf8 + + + ${LOG_PATH}/powerjob-server-error.log @@ -61,6 +68,7 @@ + From 3a32eaea04cfe8e35d2b8598fccb51a40e8da60e Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 11:02:23 +0800 Subject: [PATCH 04/20] feat: modify banner --- powerjob-server/src/main/resources/banner.txt | 3 ++- .../worker/common/OmsBannerPrinter.java | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/powerjob-server/src/main/resources/banner.txt b/powerjob-server/src/main/resources/banner.txt index dffacc32..0d73c6d3 100644 --- a/powerjob-server/src/main/resources/banner.txt +++ b/powerjob-server/src/main/resources/banner.txt @@ -8,6 +8,7 @@ ${AnsiColor.GREEN} ░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ${AnsiColor.BRIGHT_RED} -* Maintainer: tengjiqi@gmail.com +* Maintainer: tengjiqi@gmail.com & PowerJob-Team +* OfficialWebsite: http://www.powerjob.tech/ * SourceCode: https://github.com/KFCFans/PowerJob * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java index 1b1daf6b..227d5bec 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java @@ -11,21 +11,28 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public final class OmsBannerPrinter { - private static final String BANNER = "\n ███████ ██ ██ \n" + - "░██░░░░██ ░██ ░██ \n" + - "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██ \n" + - "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████ \n" + + private static final String BANNER = "" + + "\n" + + " ███████ ██ ██\n" + + "░██░░░░██ ░██ ░██\n" + + "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██\n" + + "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████\n" + "░██░░░░ ░██ ░██ ░██ ███░██░███████ ░██ ░ ░██░██ ░██░██░░░██\n" + "░██ ░██ ░██ ░████░████░██░░░░ ░██ ██ ░██░██ ░██░██ ░██\n" + - "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ \n" + - "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ \n"; + "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████\n" + + "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░\n" + + "\n" + + "* Maintainer: tengjiqi@gmail.com & PowerJob-Team\n" + + "* OfficialWebsite: http://www.powerjob.tech/\n" + + "* SourceCode: https://github.com/KFCFans/PowerJob\n" + + "\n"; public static void print() { log.info(BANNER); String version = OmsWorkerVersion.getVersion(); version = (version != null) ? " (v" + version + ")" : ""; - log.info(":: OhMyScheduler Worker :: {}", version); + log.info(":: PowerJob Worker :: {}", version); } } From 92b9f9668fce1507796061116db87d8026bf8ed9 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 11:12:17 +0800 Subject: [PATCH 05/20] feat: change pom from 3.2.3 to 3.3.0, It's time to fix dependency problem --- powerjob-client/pom.xml | 4 ++-- powerjob-common/pom.xml | 2 +- powerjob-server/pom.xml | 4 ++-- powerjob-worker-agent/pom.xml | 4 ++-- powerjob-worker-samples/pom.xml | 4 ++-- powerjob-worker-spring-boot-starter/pom.xml | 4 ++-- powerjob-worker/pom.xml | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index d1e080d8..76a28752 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-client - 3.2.3 + 3.3.0 jar - 3.2.3 + 3.3.0 5.6.1 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 8aec6af7..1f360c10 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.2.3 + 3.3.0 jar diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index a6434cec..672af6e2 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.2.3 + 3.3.0 jar 2.9.2 2.2.6.RELEASE - 3.2.3 + 3.3.0 8.0.19 19.7.0.0 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 68ee9f0c..d95e203f 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.2.3 + 3.3.0 jar - 3.2.3 + 3.3.0 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 80a5d7e2..86835a94 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.2.3 + 3.3.0 2.2.6.RELEASE - 3.2.3 + 3.3.0 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 60a42a73..d5ba6e8e 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.2.3 + 3.3.0 jar - 3.2.3 + 3.3.0 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index e0ac3936..44d95e5d 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.2.3 + 3.3.0 jar 5.2.4.RELEASE - 3.2.3 + 3.3.0 1.4.200 3.4.2 5.6.1 From caebb2d87a687a02c1adf1dd806aae67b43e976e Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 12:24:32 +0800 Subject: [PATCH 06/20] refacotr: okhttp v4 -> v3 --- .../java/com/github/kfcfans/powerjob/client/OhMyClient.java | 4 ++-- powerjob-common/pom.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 5a573cbb..62fa3179 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -108,7 +108,7 @@ public class OhMyClient { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JsonUtils.toJSONStringUnsafe(request); - String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType)); + String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -283,7 +283,7 @@ public class OhMyClient { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JsonUtils.toJSONStringUnsafe(request); - String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType)); + String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); return JsonUtils.parseObject(post, ResultDTO.class); } diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 1f360c10..947b04b9 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -18,7 +18,7 @@ 3.10 2.6 29.0-jre - 4.4.1 + 3.14.9 2.6.4 5.6.1 From 8361ddc4e514a9e9a270e3a0aa79a963fb3b360d Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 12:48:44 +0800 Subject: [PATCH 07/20] refactor: OmsException -> PowerJobException --- .../kfcfans/powerjob/client/OhMyClient.java | 8 ++--- .../kfcfans/powerjob/common/OmsException.java | 29 ------------------- .../powerjob/common/PowerJobException.java | 29 +++++++++++++++++++ .../powerjob/common/utils/CommonUtils.java | 6 ++-- .../powerjob/common/utils/JsonUtils.java | 4 +-- .../server/common/utils/DingTalkUtils.java | 6 ++-- .../server/common/utils/WorkflowDAGUtils.java | 8 ++--- .../server/service/AppInfoService.java | 6 ++-- .../alarm/impl/DingTalkAlarmService.java | 4 +-- .../service/ha/ServerSelectService.java | 4 +-- .../service/instance/InstanceService.java | 10 +++---- .../workflow/WorkflowInstanceService.java | 6 ++-- .../service/workflow/WorkflowService.java | 6 ++-- .../web/ControllerExceptionHandler.java | 4 +-- .../web/request/ModifyAppInfoRequest.java | 4 +-- .../kfcfans/powerjob/worker/OhMyWorker.java | 8 ++--- .../worker/container/OmsJarContainer.java | 6 ++-- .../tracker/processor/ProcessorTracker.java | 4 +-- .../core/tracker/task/CommonTaskTracker.java | 2 +- .../tracker/task/FrequentTaskTracker.java | 2 +- 20 files changed, 78 insertions(+), 78 deletions(-) delete mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java create mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 62fa3179..e2c9d25f 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.client; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.OpenAPIConstant; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; @@ -68,7 +68,7 @@ public class OhMyClient { currentAddress = addr; break; }else { - throw new OmsException(resultDTO.getMessage()); + throw new PowerJobException(resultDTO.getMessage()); } } }catch (IOException ignore) { @@ -76,7 +76,7 @@ public class OhMyClient { } if (StringUtils.isEmpty(currentAddress)) { - throw new OmsException("no server available"); + throw new PowerJobException("no server available"); } log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress); } @@ -426,6 +426,6 @@ public class OhMyClient { } log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress); - throw new OmsException("no server available when send post"); + throw new PowerJobException("no server available when send post"); } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java deleted file mode 100644 index 3d7db8c3..00000000 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.kfcfans.powerjob.common; - -/** - * OhMyScheduler 运行时异常 - * - * @author tjq - * @since 2020/5/26 - */ -public class OmsException extends RuntimeException { - - public OmsException() { - } - - public OmsException(String message) { - super(message); - } - - public OmsException(String message, Throwable cause) { - super(message, cause); - } - - public OmsException(Throwable cause) { - super(cause); - } - - public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java new file mode 100644 index 00000000..5d8aba04 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java @@ -0,0 +1,29 @@ +package com.github.kfcfans.powerjob.common; + +/** + * PowerJob 运行时异常 + * + * @author tjq + * @since 2020/5/26 + */ +public class PowerJobException extends RuntimeException { + + public PowerJobException() { + } + + public PowerJobException(String message) { + super(message); + } + + public PowerJobException(String message, Throwable cause) { + super(message, cause); + } + + public PowerJobException(Throwable cause) { + super(cause); + } + + public PowerJobException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java index 08da3c00..78c1973a 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.common.utils; import com.github.kfcfans.powerjob.common.OmsConstant; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -121,11 +121,11 @@ public class CommonUtils { public static T requireNonNull(T obj, String msg) { if (obj == null) { - throw new OmsException(msg); + throw new PowerJobException(msg); } if (obj instanceof String) { if (StringUtils.isEmpty((String) obj)) { - throw new OmsException(msg); + throw new PowerJobException(msg); } } return obj; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java index 4cf47bfc..3d0a61ae 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java @@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.common.utils; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import org.apache.commons.lang3.exception.ExceptionUtils; /** @@ -54,6 +54,6 @@ public class JsonUtils { }catch (Exception e) { ExceptionUtils.rethrow(e); } - throw new OmsException("impossible"); + throw new PowerJobException("impossible"); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java index 366800d6..6196d1cb 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java @@ -7,7 +7,7 @@ import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request; import com.dingtalk.api.request.OapiUserGetByMobileRequest; import com.dingtalk.api.response.OapiGettokenResponse; import com.dingtalk.api.response.OapiUserGetByMobileResponse; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -52,7 +52,7 @@ public class DingTalkUtils implements Closeable { refreshAccessToken(appKey, appSecret); if (StringUtils.isEmpty(accessToken)) { - throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret"); + throw new PowerJobException("fetch AccessToken failed, please check your appKey & appSecret"); } scheduledPool = Executors.newSingleThreadScheduledExecutor(); @@ -91,7 +91,7 @@ public class DingTalkUtils implements Closeable { return execute.getUserid(); } log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg()); - throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); + throw new PowerJobException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); } public void sendMarkdownAsync(String title, List entities, String userList, Long agentId) throws Exception { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java index e96a913a..2a98e9f9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.common.utils; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.model.WorkflowDAG; @@ -76,7 +76,7 @@ public class WorkflowDAGUtils { Map id2Node = Maps.newHashMap(); if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) { - throw new OmsException("empty graph"); + throw new PowerJobException("empty graph"); } // 创建节点 @@ -95,7 +95,7 @@ public class WorkflowDAGUtils { WorkflowDAG.Node to = id2Node.get(edge.getTo()); if (from == null || to == null) { - throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge)); + throw new PowerJobException("Illegal Edge: " + JsonUtils.toJSONString(edge)); } from.getSuccessors().add(to); @@ -106,7 +106,7 @@ public class WorkflowDAGUtils { // 合法性校验(至少存在一个顶点) if (rootIds.size() < 1) { - throw new OmsException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG)); + throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG)); } List roots = Lists.newLinkedList(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java index 2b0ec22a..2f8b0bb3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.service; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import org.springframework.stereotype.Service; @@ -28,11 +28,11 @@ public class AppInfoService { */ public Long assertApp(String appName, String password) { - AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName)); + AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName)); if (Objects.equals(appInfo.getPassword(), password)) { return appInfo.getId(); } - throw new OmsException("password error!"); + throw new PowerJobException("password error!"); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java index eedbf5ac..aa01d08f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.service.alarm.impl; import com.github.kfcfans.powerjob.common.OmsConstant; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.SJ; @@ -55,7 +55,7 @@ public class DingTalkAlarmService implements Alarmable { String userId = mobile2UserIdCache.get(user.getPhone(), () -> { try { return dingTalkUtils.fetchUserIdByMobile(user.getPhone()); - } catch (OmsException ignore) { + } catch (PowerJobException ignore) { return EMPTY_TAG; } catch (Exception ignore) { return null; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java index e9037176..e82760b8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.ha; import akka.actor.ActorSelection; import akka.pattern.Patterns; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.requests.Ping; @@ -56,7 +56,7 @@ public class ServerSelectService { // 无锁获取当前数据库中的Server Optional appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { - throw new OmsException(appId + " is not registered!"); + throw new PowerJobException(appId + " is not registered!"); } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index a9ebc414..bab39209 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance; import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.model.InstanceDetail; @@ -135,11 +135,11 @@ public class InstanceService { public void retryInstance(Long instanceId) { InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) { - throw new OmsException("Only stopped instance can be retry!"); + throw new PowerJobException("Only stopped instance can be retry!"); } // 暂时不支持工作流任务的重试 if (instanceInfo.getWfInstanceId() != null) { - throw new OmsException("Workflow's instance do not support retry!"); + throw new PowerJobException("Workflow's instance do not support retry!"); } instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); @@ -152,7 +152,7 @@ public class InstanceService { // 派发任务 Long jobId = instanceInfo.getJobId(); - JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new OmsException("can't find job info by jobId: " + jobId)); + JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId)); dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); } @@ -187,7 +187,7 @@ public class InstanceService { log.info("[Instance-{}] cancel the instance successfully.", instanceId); }else { log.warn("[Instance-{}] cancel the instance failed.", instanceId); - throw new OmsException("instance already up and running"); + throw new PowerJobException("instance already up and running"); } }catch (Exception e) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index 08a20494..3be885c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.workflow; import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; @@ -43,7 +43,7 @@ public class WorkflowInstanceService { public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { - throw new OmsException("workflow instance already stopped"); + throw new PowerJobException("workflow instance already stopped"); } // 停止所有已启动且未完成的服务 PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); @@ -80,7 +80,7 @@ public class WorkflowInstanceService { public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); if (!Objects.equals(appId, wfInstance.getAppId())) { - throw new OmsException("Permission Denied!"); + throw new PowerJobException("Permission Denied!"); } return wfInstance; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 63e224b5..a2e0a78d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.service.workflow; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO; @@ -42,7 +42,7 @@ public class WorkflowService { req.valid(); if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) { - throw new OmsException("illegal DAG"); + throw new PowerJobException("illegal DAG"); } Long wfId = req.getId(); @@ -145,7 +145,7 @@ public class WorkflowService { private WorkflowInfoDO permissionCheck(Long wfId, Long appId) { WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId)); if (!wfInfo.getAppId().equals(appId)) { - throw new OmsException("Permission Denied!can't delete other appId's workflow!"); + throw new PowerJobException("Permission Denied!can't delete other appId's workflow!"); } return wfInfo; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java index 3ec9821d..54c3e75e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.web; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.ResultDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.http.converter.HttpMessageNotReadableException; @@ -25,7 +25,7 @@ public class ControllerExceptionHandler { public ResultDTO exceptionHandler(Exception e) { // 不是所有异常都需要打印完整堆栈,后续可以定义内部的Exception,便于判断 - if (e instanceof IllegalArgumentException || e instanceof OmsException) { + if (e instanceof IllegalArgumentException || e instanceof PowerJobException) { log.warn("[ControllerException] http request failed, message is {}.", e.getMessage()); } else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) { log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java index 77ea5da3..bd007626 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.web.request; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import lombok.Data; import org.apache.commons.lang3.StringUtils; @@ -21,7 +21,7 @@ public class ModifyAppInfoRequest { public void valid() { CommonUtils.requireNonNull(appName, "appName can't be empty"); if (StringUtils.containsWhitespace(appName)) { - throw new OmsException("appName can't contains white space!"); + throw new PowerJobException("appName can't contains white space!"); } } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 3a78f2f9..1cde2378 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -5,7 +5,7 @@ import akka.actor.ActorSystem; import akka.actor.DeadLetter; import akka.actor.Props; import akka.routing.RoundRobinPool; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.CommonUtils; @@ -163,16 +163,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di return appId; }else { log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); - throw new OmsException(resultDTO.getMessage()); + throw new PowerJobException(resultDTO.getMessage()); } - }catch (OmsException oe) { + }catch (PowerJobException oe) { throw oe; }catch (Exception ignore) { log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl); } } log.error("[OhMyWorker] no available server in {}.", config.getServerAddress()); - throw new OmsException("no server available!"); + throw new PowerJobException("no server available!"); } @Override diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java index 6dfacce6..8efe4adf 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.worker.container; import com.github.kfcfans.powerjob.common.ContainerConstant; -import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; @@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer { if (propertiesURLStream == null) { log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); - throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); + throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); } properties.load(propertiesURLStream); @@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer { String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY); if (StringUtils.isEmpty(packageName)) { log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId); - throw new OmsException("invalid jar file"); + throw new PowerJobException("invalid jar file"); } // 加载用户类 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index e9957127..38d4937f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -318,12 +318,12 @@ public class ProcessorTracker { break; default: log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType); - throw new OmsException("unknown processor type of " + processorType); + throw new PowerJobException("unknown processor type of " + processorType); } if (processor == null) { log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); - throw new OmsException("fetch Processor failed"); + throw new PowerJobException("fetch Processor failed"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 01e1f9ac..e3f6a08e 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -110,7 +110,7 @@ public class CommonTaskTracker extends TaskTracker { log.info("[TaskTracker-{}] create root task successfully.", instanceId); }else { log.error("[TaskTracker-{}] create root task failed.", instanceId); - throw new OmsException("create root task failed for instance: " + instanceId); + throw new PowerJobException("create root task failed for instance: " + instanceId); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 5f797461..f56e9a62 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -91,7 +91,7 @@ public class FrequentTaskTracker extends TaskTracker { if (timeExpressionType == TimeExpressionType.FIX_RATE) { // 固定频率需要设置最小间隔 if (timeParams < MIN_INTERVAL) { - throw new OmsException("time interval too small, please set the timeExpressionInfo >= 1000"); + throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000"); } scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS); }else { From a138e05404b3ba20884d9264e814fa46bfe220b6 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 3 Oct 2020 13:31:58 +0800 Subject: [PATCH 08/20] feat: add Workflow initPrams #58 --- .../kfcfans/powerjob/client/OhMyClient.java | 17 +++++++++++++---- .../core/model/WorkflowInstanceInfoDO.java | 5 +++++ .../timing/InstanceStatusCheckService.java | 2 +- .../timing/schedule/OmsScheduleService.java | 4 ++-- .../workflow/WorkflowInstanceManager.java | 17 +++++++++++++---- .../service/workflow/WorkflowService.java | 14 ++++++++++---- .../web/controller/OpenAPIController.java | 4 ++-- .../web/controller/WorkflowController.java | 2 +- 8 files changed, 47 insertions(+), 18 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index e2c9d25f..600f49a9 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -349,17 +349,26 @@ public class OhMyClient { /** * 运行工作流 - * @param workflowId workflowId + * @param workflowId 工作流ID + * @param initParams 启动参数 + * @param delay 延迟时间,单位 MS * @return 工作流实例ID - * @throws Exception 异常 + * @throws Exception 异常信息 */ - public ResultDTO runWorkflow(Long workflowId) throws Exception { + public ResultDTO runWorkflow(Long workflowId, String initParams, long delay) throws Exception { FormBody.Builder builder = new FormBody.Builder() .add("workflowId", workflowId.toString()) - .add("appId", appId.toString()); + .add("appId", appId.toString()) + .add("delay", String.valueOf(delay)); + if (StringUtils.isNotEmpty(initParams)) { + builder.add("initParams", initParams); + } String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); return JsonUtils.parseObject(post, ResultDTO.class); } + public ResultDTO runWorkflow(Long workflowId) throws Exception { + return runWorkflow(workflowId, null, 0); + } /* ************* Workflow Instance 区 ************* */ /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index 5eacf323..c2a074a4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -36,6 +36,11 @@ public class WorkflowInstanceInfoDO { // workflow 状态(WorkflowInstanceStatus) private Integer status; + // 工作流启动参数 + @Lob + @Column + private String wfInitParams; + @Lob @Column private String dag; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index b846ccbc..489f2874 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -171,7 +171,7 @@ public class InstanceStatusCheckService { waitingWfInstanceList.forEach(wfInstance -> { Optional workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId()); workflowOpt.ifPresent(workflowInfo -> { - workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId()); + workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams()); log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId()); }); }); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 6e09a62a..3da63e5d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -207,7 +207,7 @@ public class OmsScheduleService { wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 - Long wfInstanceId = workflowInstanceManager.create(wfInfo); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, null); // 2. 推入时间轮,准备调度执行 long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis(); @@ -215,7 +215,7 @@ public class OmsScheduleService { log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis()); delay = 0; } - InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId)); + InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null)); // 3. 重新计算下一次调度时间并更新 try { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index d9e7fab2..37ca9396 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -67,9 +67,10 @@ public class WorkflowInstanceManager { /** * 创建工作流任务实例 * @param wfInfo 工作流任务元数据(描述信息) + * @param initParams 启动参数 * @return wfInstanceId */ - public Long create(WorkflowInfoDO wfInfo) { + public Long create(WorkflowInfoDO wfInfo, String initParams) { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); @@ -82,6 +83,7 @@ public class WorkflowInstanceManager { newWfInstance.setWorkflowId(wfId); newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); newWfInstance.setActualTriggerTime(System.currentTimeMillis()); + newWfInstance.setWfInitParams(initParams); newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); @@ -107,8 +109,9 @@ public class WorkflowInstanceManager { * 开始任务 * @param wfInfo 工作流任务信息 * @param wfInstanceId 工作流任务实例ID + * @param initParams 启动参数 */ - public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) { + public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); if (!wfInstanceInfoOpt.isPresent()) { @@ -132,13 +135,19 @@ public class WorkflowInstanceManager { try { + // 构建根任务启动参数(为了精简 worker 端实现,启动参数仍以 instanceParams 字段承接) + Map preJobId2Result = Maps.newHashMap(); + // 模拟 preJobId -> preJobResult 的格式,-1 代表前置任务不存在 + preJobId2Result.put("-1", initParams); + String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result); + PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); List roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); // 创建所有的根任务 roots.forEach(root -> { - Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); + Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis()); root.setInstanceId(instanceId); root.setStatus(InstanceStatus.RUNNING.getV()); @@ -152,7 +161,7 @@ public class WorkflowInstanceManager { log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null)); + roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams)); }catch (Exception e) { log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index a2e0a78d..073d7491 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; +import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -130,15 +131,20 @@ public class WorkflowService { * 立即运行工作流 * @param wfId 工作流ID * @param appId 所属应用ID + * @param initParams 启动参数 + * @param delay 延迟时间 * @return 该 workflow 实例的 instanceId(wfInstanceId) */ - public Long runWorkflow(Long wfId, Long appId) { + public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); - Long wfInstanceId = workflowInstanceManager.create(wfInfo); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); - // 正式启动任务 - workflowInstanceManager.start(wfInfo, wfInstanceId); + if (delay <= 0) { + workflowInstanceManager.start(wfInfo, wfInstanceId, initParams); + }else { + InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams)); + } return wfInstanceId; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index 40fd0b6a..a846f73c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -150,8 +150,8 @@ public class OpenAPIController { } @PostMapping(OpenAPIConstant.RUN_WORKFLOW) - public ResultDTO runWorkflow(Long workflowId, Long appId) { - return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); + public ResultDTO runWorkflow(Long workflowId, Long appId, @RequestParam(required = false) String initParams, @RequestParam(required = false) Long delay) { + return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay == null ? 0 : delay)); } /* ************* Workflow Instance 区 ************* */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java index 25095922..9eb1b589 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java @@ -79,7 +79,7 @@ public class WorkflowController { @GetMapping("/run") public ResultDTO runWorkflow(Long workflowId, Long appId) { - return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); + return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0)); } private static PageResult convertPage(Page originPage) { From fb61ad0bc65ffb333da64efc911773bf9ffb5fb1 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 11:19:24 +0800 Subject: [PATCH 09/20] fix: fixed-cron-expression schedule #64 --- .../powerjob/server/service/JobService.java | 10 ++- .../timing/schedule/OmsScheduleService.java | 63 +++++++++++-------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 9b8a874c..9ae00a1c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; @@ -78,7 +79,7 @@ public class JobService { jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds())); } - refreshJob(jobInfoDO); + calculateNextTriggerTime(jobInfoDO); if (request.getId() == null) { jobInfoDO.setGmtCreate(new Date()); } @@ -143,7 +144,7 @@ public class JobService { JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); - refreshJob(jobInfoDO); + calculateNextTriggerTime(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO); } @@ -184,7 +185,7 @@ public class JobService { }); } - private void refreshJob(JobInfoDO jobInfoDO) throws Exception { + private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception { // 计算下次调度时间 Date now = new Date(); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); @@ -192,6 +193,9 @@ public class JobService { if (timeExpressionType == TimeExpressionType.CRON) { CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); Date nextValidTime = cronExpression.getNextValidTimeAfter(now); + if (nextValidTime == null) { + throw new PowerJobException("invalid cron expression: " + jobInfoDO.getTimeExpression()); + } jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { jobInfoDO.setTimeExpression(null); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 3da63e5d..2115a664 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -121,8 +121,6 @@ public class OmsScheduleService { */ private void scheduleCronJob(List appIds) { - - Date now = new Date(); long nowTime = System.currentTimeMillis(); long timeThreshold = nowTime + 2 * SCHEDULE_RATE; Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { @@ -165,24 +163,13 @@ public class OmsScheduleService { }); // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) - List updatedJobInfos = Lists.newLinkedList(); jobInfos.forEach(jobInfoDO -> { - try { - - Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression()); - - JobInfoDO updatedJobInfo = new JobInfoDO(); - BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); - updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); - updatedJobInfo.setGmtModified(now); - - updatedJobInfos.add(updatedJobInfo); + refreshJob(jobInfoDO); } catch (Exception e) { - log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e); + log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e); } }); - jobInfoRepository.saveAll(updatedJobInfos); jobInfoRepository.flush(); @@ -203,7 +190,6 @@ public class OmsScheduleService { return; } - Date now = new Date(); wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 @@ -219,16 +205,9 @@ public class OmsScheduleService { // 3. 重新计算下一次调度时间并更新 try { - Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); - - WorkflowInfoDO updateEntity = new WorkflowInfoDO(); - BeanUtils.copyProperties(wfInfo, updateEntity); - - updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); - updateEntity.setGmtModified(now); - workflowInfoRepository.save(updateEntity); + refreshWorkflow(wfInfo); }catch (Exception e) { - log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e); + log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e); } }); workflowInfoRepository.flush(); @@ -264,6 +243,40 @@ public class OmsScheduleService { }); } + private void refreshJob(JobInfoDO jobInfo) throws Exception { + Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); + + JobInfoDO updatedJobInfo = new JobInfoDO(); + BeanUtils.copyProperties(jobInfo, updatedJobInfo); + + if (nextTriggerTime == null) { + log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId()); + updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV()); + }else { + updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); + } + updatedJobInfo.setGmtModified(new Date()); + + jobInfoRepository.save(updatedJobInfo); + } + + private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception { + Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); + + WorkflowInfoDO updateEntity = new WorkflowInfoDO(); + BeanUtils.copyProperties(wfInfo, updateEntity); + + if (nextTriggerTime == null) { + log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId()); + wfInfo.setStatus(SwitchableStatus.DISABLE.getV()); + }else { + updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); + } + + updateEntity.setGmtModified(new Date()); + workflowInfoRepository.save(updateEntity); + } + /** * 计算下次触发时间 * @param preTriggerTime 前一次触发时间 From 8560112432b3bcc7bb0aa83c30080ad49096322c Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 12:40:10 +0800 Subject: [PATCH 10/20] feat: update sql for new DO --- others/oms-sql.sql | 35 ++++++++++--------- .../src/test/java/TestWorkflow.java | 32 +++++++++++++++-- .../resources/application-daily.properties | 2 +- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/others/oms-sql.sql b/others/oms-sql.sql index 332cd8ee..1a934017 100644 --- a/others/oms-sql.sql +++ b/others/oms-sql.sql @@ -1,15 +1,17 @@ /* Navicat Premium Data Transfer + Source Server : Local MySQL Source Server Type : MySQL - Source Server Version : 80020 - Source Schema : powerjob-product + Source Server Version : 80021 + Source Host : localhost:3306 + Source Schema : powerjob-daily Target Server Type : MySQL - Target Server Version : 80020 + Target Server Version : 80021 File Encoding : 65001 - Date: 23/06/2020 22:30:06 + Date: 08/10/2020 12:39:10 */ SET NAMES utf8mb4; @@ -28,7 +30,7 @@ CREATE TABLE `app_info` ( `password` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `appNameUK` (`app_name`) -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for container_info @@ -62,10 +64,10 @@ CREATE TABLE `instance_info` ( `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, `instance_id` bigint DEFAULT NULL, - `instance_params` text, + `instance_params` longtext, `job_id` bigint DEFAULT NULL, `last_report_time` bigint DEFAULT NULL, - `result` text, + `result` longtext, `running_times` bigint DEFAULT NULL, `status` int DEFAULT NULL, `task_tracker_address` varchar(255) DEFAULT NULL, @@ -75,7 +77,7 @@ CREATE TABLE `instance_info` ( KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`), KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`), KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for job_info @@ -101,7 +103,7 @@ CREATE TABLE `job_info` ( `min_memory_space` double NOT NULL, `next_trigger_time` bigint DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL, - `processor_info` text, + `processor_info` longtext, `processor_type` int DEFAULT NULL, `status` int DEFAULT NULL, `task_retry_num` int DEFAULT NULL, @@ -109,7 +111,7 @@ CREATE TABLE `job_info` ( `time_expression_type` int DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for oms_lock @@ -124,7 +126,7 @@ CREATE TABLE `oms_lock` ( `ownerip` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `lockNameUK` (`lock_name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for server_info @@ -166,7 +168,7 @@ CREATE TABLE `workflow_info` ( `max_wf_instance_num` int DEFAULT NULL, `next_trigger_time` bigint DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL, - `pedag` text, + `pedag` longtext, `status` int DEFAULT NULL, `time_expression` varchar(255) DEFAULT NULL, `time_expression_type` int DEFAULT NULL, @@ -174,7 +176,7 @@ CREATE TABLE `workflow_info` ( `wf_name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for workflow_instance_info @@ -184,15 +186,16 @@ CREATE TABLE `workflow_instance_info` ( `id` bigint NOT NULL AUTO_INCREMENT, `actual_trigger_time` bigint DEFAULT NULL, `app_id` bigint DEFAULT NULL, - `dag` text, + `dag` longtext, `finished_time` bigint DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, - `result` text, + `result` longtext, `status` int DEFAULT NULL, + `wf_init_params` longtext, `wf_instance_id` bigint DEFAULT NULL, `workflow_id` bigint DEFAULT NULL, PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; SET FOREIGN_KEY_CHECKS = 1; diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index 3b5f2f75..c99efc0e 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -1,7 +1,11 @@ import com.github.kfcfans.powerjob.client.OhMyClient; +import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; +import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -20,7 +24,23 @@ public class TestWorkflow { @BeforeAll public static void initClient() throws Exception { - ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null); + ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); + } + + @Test + public void initTestData() throws Exception { + SaveJobInfoRequest base = new SaveJobInfoRequest(); + base.setJobName("DAG-Node-"); + base.setTimeExpressionType(TimeExpressionType.WORKFLOW); + base.setExecuteType(ExecuteType.STANDALONE); + base.setProcessorType(ProcessorType.EMBEDDED_JAVA); + base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor"); + + for (int i = 0; i < 5; i++) { + SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class); + request.setJobName(request.getJobName() + i); + System.out.println(ohMyClient.saveJob(request)); + } } @Test @@ -30,8 +50,8 @@ public class TestWorkflow { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, "node-1")); - nodes.add(new PEWorkflowDAG.Node(2L, "node-2")); + nodes.add(new PEWorkflowDAG.Node(1L, "DAG-Node-1")); + nodes.add(new PEWorkflowDAG.Node(2L, "DAG-Node-2")); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); @@ -81,4 +101,10 @@ public class TestWorkflow { public void testFetchWfInstanceInfo() throws Exception { System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L)); } + + @Test + public void testRunWorkflowPlus() throws Exception { + System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 1", 0)); + System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 30)); + } } diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index 5eca462b..6d83511f 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 ####### oms.mongodb.enable=true -spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily +spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily ####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) ####### spring.mail.host=smtp.163.com From 763e416be3d2ed59daf026dcb8a8f5ce35e0ddc9 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 12:55:20 +0800 Subject: [PATCH 11/20] feat: optimize out of data's cron expression and passed the test --- .../powerjob/server/service/JobService.java | 2 +- .../web/controller/InstanceController.java | 5 ++-- .../powerjob/server/test/CronTest.java | 26 +++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 9ae00a1c..ad81184f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -194,7 +194,7 @@ public class JobService { CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); Date nextValidTime = cronExpression.getNextValidTimeAfter(now); if (nextValidTime == null) { - throw new PowerJobException("invalid cron expression: " + jobInfoDO.getTimeExpression()); + throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression()); } jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index 6f6f389c..300b55db 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; @@ -145,10 +146,10 @@ public class InstanceController { private String getTargetServer(Long instanceId) { InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); if (instanceInfo == null) { - throw new RuntimeException("invalid instanceId: " + instanceId); + throw new PowerJobException("invalid instanceId: " + instanceId); } Optional appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId()); - return appInfoOpt.orElseThrow(() -> new RuntimeException("impossible")).getCurrentServer(); + return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer(); } } diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java new file mode 100644 index 00000000..98316099 --- /dev/null +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java @@ -0,0 +1,26 @@ +package com.github.kfcfans.powerjob.server.test; + + +import com.github.kfcfans.powerjob.server.common.utils.CronExpression; +import org.junit.Test; + +import java.util.Date; + +/** + * CRON 测试 + * + * @author tjq + * @since 2020/10/8 + */ +public class CronTest { + + private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020"; + + @Test + public void testFixedTimeCron() throws Exception { + CronExpression cronExpression = new CronExpression(FIXED_CRON); + System.out.println(cronExpression.getCronExpression()); + System.out.println(cronExpression.getNextValidTimeAfter(new Date())); + } + +} From 71dbe05647330b509c7bd50592f6a2f20e3d5328 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 13:10:47 +0800 Subject: [PATCH 12/20] feat: optimize workflow OpenAPI and passed the test --- .../java/com/github/kfcfans/powerjob/client/OhMyClient.java | 6 +++--- powerjob-client/src/test/java/TestWorkflow.java | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 600f49a9..a2f91a13 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -351,15 +351,15 @@ public class OhMyClient { * 运行工作流 * @param workflowId 工作流ID * @param initParams 启动参数 - * @param delay 延迟时间,单位 MS + * @param delayMS 延迟时间,单位毫秒 ms * @return 工作流实例ID * @throws Exception 异常信息 */ - public ResultDTO runWorkflow(Long workflowId, String initParams, long delay) throws Exception { + public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception { FormBody.Builder builder = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) - .add("delay", String.valueOf(delay)); + .add("delay", String.valueOf(delayMS)); if (StringUtils.isNotEmpty(initParams)) { builder.add("initParams", initParams); } diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index c99efc0e..0e888742 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -104,7 +104,6 @@ public class TestWorkflow { @Test public void testRunWorkflowPlus() throws Exception { - System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 1", 0)); - System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 30)); + System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000)); } } From a9e4ed6262d13e72168c1f1ca5b2932a351b5c6b Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 15:47:38 +0800 Subject: [PATCH 13/20] fix: stopwatch exception in FrequentTaskTracker --- powerjob-worker-agent/src/main/resources/logback.xml | 4 ++-- .../worker/core/tracker/task/FrequentTaskTracker.java | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/powerjob-worker-agent/src/main/resources/logback.xml b/powerjob-worker-agent/src/main/resources/logback.xml index c8d4ec08..ac501a9a 100644 --- a/powerjob-worker-agent/src/main/resources/logback.xml +++ b/powerjob-worker-agent/src/main/resources/logback.xml @@ -29,7 +29,7 @@ ${LOG_PATH}/powerjob-agent-error.log ${LOG_PATH}/powerjob-agent-error.%d{yyyy-MM-dd}.log - 7 + 3 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -49,7 +49,7 @@ ${LOG_PATH}/powerjob-agent-application.log ${LOG_PATH}/powerjob-agent-application.%d{yyyy-MM-dd}.log - 7 + 3 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index f56e9a62..ffc448fa 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -295,14 +295,11 @@ public class FrequentTaskTracker extends TaskTracker { newLastTask.setAddress(OhMyWorker.getWorkerAddress()); submitTask(Lists.newArrayList(newLastTask)); } - } } - // 舍去一切重试机制,反正超时就失败 - - log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch.stop()); } + log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch); } private void reportStatus() { From a67adf96017de69810d5e82f78ef3de9d97567e7 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 16:54:47 +0800 Subject: [PATCH 14/20] fix: never update lastActiveTime which lead to TIMEOUT for frequentJob(thanks @Y) --- .../processors/BroadcastProcessorDemo.java | 1 + .../worker/actors/TaskTrackerActor.java | 2 +- .../core/tracker/task/FrequentTaskTracker.java | 17 +++++++++++++++-- .../worker/core/tracker/task/TaskTracker.java | 5 +++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java index 9d7e898f..73bb3ec7 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java @@ -36,6 +36,7 @@ public class BroadcastProcessorDemo extends BroadcastProcessor { public ProcessResult process(TaskContext taskContext) throws Exception { System.out.println("===== BroadcastProcessorDemo#process ======"); taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); + Thread.sleep(45 * 1000); return new ProcessResult(true); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index cbb69d8d..4b53ae84 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -65,7 +65,7 @@ public class TaskTrackerActor extends AbstractActor { taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); } - taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); + taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index ffc448fa..b575ec4b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -104,6 +105,14 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); } + @Override + public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { + super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result); + // 更新 LastActiveTime + SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId); + timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime); + } + @Override public InstanceDetail fetchRunningStatus() { InstanceDetail detail = new InstanceDetail(); @@ -243,9 +252,13 @@ public class FrequentTaskTracker extends TaskTracker { long heartbeatTimeout = nowTS - timeHolder.lastActiveTime; // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 - if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { + if (executeTimeout > instanceTimeoutMS) { + onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator); + continue; + } - onFinished(subInstanceId, false, "TIMEOUT", iterator); + if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { + onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator); continue; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index a34401c4..1974e0c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -138,12 +138,13 @@ public abstract class TaskTracker { /** * 更新Task状态 * V1.0.0 -> V1.0.1(e405e283ad7f97b0b4e5d369c7de884c0caf9192) 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 + * @param subInstanceId 子任务实例ID * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间 * @param result task的执行结果,未执行完成时为空 */ - public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { + public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { if (finished.get()) { return; @@ -278,7 +279,7 @@ public abstract class TaskTracker { List unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); if (!CollectionUtils.isEmpty(unfinishedTask)) { log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask); - unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); + unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); } } } From 9aa8c0bb87891d39022b4599d1aadb9450c56511 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 17:01:15 +0800 Subject: [PATCH 15/20] fix: Incorrect sequencing leads to LAUNCH_FAILED --- .../core/tracker/task/FrequentTaskTracker.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index b575ec4b..5d43a969 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -153,15 +153,10 @@ public class FrequentTaskTracker extends TaskTracker { // 子任务实例ID Long subInstanceId = triggerTimes.incrementAndGet(); - // 记录时间 - SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); - timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); - subInstanceId2TimeHolder.put(subInstanceId, timeHolder); - - // 执行记录缓存 + // 执行记录缓存(只做展示,因此可以放在前面) SubInstanceInfo subInstanceInfo = new SubInstanceInfo(); subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue(); - subInstanceInfo.startTime = timeHolder.startTime; + subInstanceInfo.startTime = System.currentTimeMillis(); recentSubInstanceInfo.put(subInstanceId, subInstanceInfo); String myAddress = OhMyWorker.getWorkerAddress(); @@ -200,6 +195,11 @@ public class FrequentTaskTracker extends TaskTracker { return; } + // 生成记录信息(必须保证持久化成功才能生成该记录,否则会导致 LAUNCH_FAILED 错误) + SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); + timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); + subInstanceId2TimeHolder.put(subInstanceId, timeHolder); + dispatchTask(newRootTask, myAddress); } From 6d8fe8e7ff69d56d17d02a7d52b88af02a038db7 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 17:08:14 +0800 Subject: [PATCH 16/20] refactor: optimize server log level --- others/logs/BasicFunctionTestRecord.md | 105 ------------------ others/logs/ContainerTestRecord.md | 34 ------ others/logs/OnlineLogTestRecord.md | 100 ----------------- .../server/service/InstanceLogService.java | 2 +- 4 files changed, 1 insertion(+), 240 deletions(-) delete mode 100644 others/logs/BasicFunctionTestRecord.md delete mode 100644 others/logs/ContainerTestRecord.md delete mode 100644 others/logs/OnlineLogTestRecord.md diff --git a/others/logs/BasicFunctionTestRecord.md b/others/logs/BasicFunctionTestRecord.md deleted file mode 100644 index 4ffb171a..00000000 --- a/others/logs/BasicFunctionTestRecord.md +++ /dev/null @@ -1,105 +0,0 @@ -# 2020.4.8 第一轮测试 -## 测试用例 -* MapReduce任务:http://localhost:7700/job/save?appId=1&concurrency=5&executeType=MAP_REDUCE&groupName=null&instanceRetryNum=3&instanceTimeLimit=4545454545&jobDescription=jobDescription&jobName=testJob&jobParams=%7B%22a%22%3A%22b%22%7D&maxInstanceNum=1&processorInfo=com.github.kfcfans.powerjob.processors.TestMapReduceProcessor&processorType=EMBEDDED_JAVA&status=1&taskRetryNum=3&taskTimeLimit=564465656&timeExpression=0%20*%20*%20*%20*%20%3F%20&timeExpressionType=CRON - -## 问题记录 -#### 任务执行成功,释放资源失败 -第一个任务执行完成后,释放资源阶段(删除本地H2数据库中所有记录)报错,堆栈如下: -```text -2020-04-08 10:09:19 INFO - [ProcessorTracker-1586311659084] mission complete, ProcessorTracker already destroyed! -2020-04-08 10:09:19 ERROR - [TaskPersistenceService] deleteAllTasks failed, instanceId=1586311659084. -java.lang.InterruptedException: sleep interrupted - at java.lang.Thread.sleep(Native Method) - at CommonUtils.executeWithRetry(CommonUtils.java:34) - at TaskPersistenceService.execute(TaskPersistenceService.java:297) - at TaskPersistenceService.deleteAllTasks(TaskPersistenceService.java:269) - at CommonTaskTracker.destroy(TaskTracker.java:231) - at CommonTaskTracker$StatusCheckRunnable.innerRun(TaskTracker.java:421) - at CommonTaskTracker$StatusCheckRunnable.run(TaskTracker.java:467) - at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) - at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at java.lang.Thread.run(Thread.java:748) -2020-04-08 10:09:19 WARN - [TaskTracker-1586311659084] delete tasks from database failed. -2020-04-08 10:09:19 INFO - [TaskTracker-1586311659084] TaskTracker has left the world. -``` -随后,Server派发下来的第二个任务也无法完成创建,异常堆栈如下: -```text -2020-04-08 10:10:08 ERROR - [TaskPersistenceService] save taskTaskDO{taskId='0', jobId='1', instanceId='1586311804030', taskName='OMS_ROOT_TASK', address='10.37.129.2:2777', status=1, result='null', failedCnt=0, createdTime=1586311808295, lastModifiedTime=1586311808295} failed. -2020-04-08 10:10:08 ERROR - [TaskTracker-1586311804030] create root task failed. -[ERROR] [04/08/2020 10:10:08.511] [oms-akka.actor.internal-dispatcher-20] [akka://oms/user/task_tracker] create root task failed. -java.lang.RuntimeException: create root task failed. - at CommonTaskTracker.persistenceRootTask(TaskTracker.java:208) - at CommonTaskTracker.(TaskTracker.java:81) - at TaskTrackerActor.lambda$onReceiveServerScheduleJobReq$2(TaskTrackerActor.java:138) - at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) - at TaskTrackerPool.atomicCreateTaskTracker(TaskTrackerPool.java:30) - at TaskTrackerActor.onReceiveServerScheduleJobReq(TaskTrackerActor.java:138) -``` -*** -原因及解决方案:destroy方法调用了scheduledPool.shutdownNow()方法导致调用该方法的线程池被强制关闭,该方法也自然被中断,数据删到一半没删掉,破坏了数据库结构,后面的insert自然也就失败了。 - -# 2020.4.11 "集群"测试 -#### 任务重试机制失效 -原因:SQL中的now()函数返回的是Datetime,不能用ing/bigint去接收... - -#### SystemMetric算分问题 -问题:java.lang.management.OperatingSystemMXBean#getSystemLoadAverage 不一定能获取CPU当前负载,可能返回负数代表不可用... -解决方案:印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧... - -#### 未知的数组越界问题(可能是数据库性能问题) -问题:秒级Broadcast任务在第四次执行时,当Processor完成执行上报状态时,TaskTracker报错,错误的本质原因是无法从数据库中找到这个task对应的记录... -场景:时间表达式:FIX_DELAY,对应的TaskTracker为FrequentTaskTracker - -异常堆栈 -```text -2020-04-16 18:05:09 ERROR - [TaskPersistenceService] getTaskStatus failed, instanceId=1586857062542,taskId=4. -java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 - at java.util.LinkedList.checkElementIndex(LinkedList.java:555) - at java.util.LinkedList.get(LinkedList.java:476) - at TaskPersistenceService.lambda$getTaskStatus$10(TaskPersistenceService.java:214) - at CommonUtils.executeWithRetry(CommonUtils.java:37) - at TaskPersistenceService.execute(TaskPersistenceService.java:310) - at TaskPersistenceService.getTaskStatus(TaskPersistenceService.java:212) - at TaskTracker.updateTaskStatus(TaskTracker.java:107) - at TaskTracker.broadcast(TaskTracker.java:214) - at TaskTrackerActor.onReceiveBroadcastTaskPreExecuteFinishedReq(TaskTrackerActor.java:106) - at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) - at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) - at scala.PartialFunction.applyOrElse(PartialFunction.scala:187) - at scala.PartialFunction.applyOrElse$(PartialFunction.scala:186) - at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:241) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at akka.actor.Actor.aroundReceive(Actor.scala:534) - at akka.actor.Actor.aroundReceive$(Actor.scala:532) - at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) - at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573) - at akka.actor.ActorCell.invoke(ActorCell.scala:543) - at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269) - at akka.dispatch.Mailbox.run(Mailbox.scala:230) - at akka.dispatch.Mailbox.exec(Mailbox.scala:242) - at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) - at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) - at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) - at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) -2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6). -``` -解决方案:初步怀疑在连续更改时,由于数据库锁的存在导致行不可见(不知道H2具体的特性)。因此,需要保证同一个taskId串行更新 -> synchronize Yes! - -# 2020.4.20 1.0.0发布前测试 -#### Server & Worker -* 指定机器执行 -> 验证通过 -* Map/MapReduce/Standalone/Broadcast/Shell/Python处理器的执行 -> 验证通过 -* 超时失败 -> 验证通过 -* 破坏测试:指定错误的处理器 -> 发现问题,会造成死锁(TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T)。通过确保ProcessorTracker一定能创建成功解决,如果处理器构建失败,之后所有提交的任务直接返回错误。 -#### Client -* StopInstance -> success -* FetchInstanceStatus -> success - diff --git a/others/logs/ContainerTestRecord.md b/others/logs/ContainerTestRecord.md deleted file mode 100644 index 7a0d3326..00000000 --- a/others/logs/ContainerTestRecord.md +++ /dev/null @@ -1,34 +0,0 @@ -# 容器测试日志 -## ClassNotFound问题 ->玩热加载这一套,不来几个ClassNotFound都没那味 [滑稽]~ - -测试容器化的MapReduce任务时,发现如下错误: -```text -2020-05-19 09:33:18 ERROR - [ProcessorRunnable-142925055284740224] execute failed, please fix this bug @tjq! -com.esotericsoftware.kryo.KryoException: Unable to find class: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask - at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:182) - at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:151) - at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:684) - at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:795) - at SerializerUtils.deSerialized(SerializerUtils.java:48) - at ProcessorRunnable.innerRun(ProcessorRunnable.java:63) - at ProcessorRunnable.run(ProcessorRunnable.java:179) - at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) - at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) - at java.util.concurrent.FutureTask.run(FutureTask.java) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at java.lang.Thread.run(Thread.java:748) -Caused by: java.lang.ClassNotFoundException: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask - at java.net.URLClassLoader.findClass(URLClassLoader.java:382) - at java.lang.ClassLoader.loadClass(ClassLoader.java:418) - at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) - at java.lang.ClassLoader.loadClass(ClassLoader.java:351) - at java.lang.Class.forName0(Native Method) - at java.lang.Class.forName(Class.java:348) - at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:176) - ... 12 common frames omitted -``` - -* 原因分析:经过分析,原有在于序列化与反序列化过程中,框架为了追求性能,采用了**对象池**技术(库存代码: a14f554e0085b6a179375a8ca04665434b73c7bd#SerializerUtils),而Kryo在序列化和反序列化过程中只会使用固定的类加载器(创建kryo的类对象(Kryo.class)的类加载器),因此无法找到由OMS自定义类加载器创建的容器类。 -* 解决方案:弃用性能优异的对象池技术,该用ThreadLocal + 手动设置Kryo类加载器。 \ No newline at end of file diff --git a/others/logs/OnlineLogTestRecord.md b/others/logs/OnlineLogTestRecord.md deleted file mode 100644 index 7ce8eb94..00000000 --- a/others/logs/OnlineLogTestRecord.md +++ /dev/null @@ -1,100 +0,0 @@ -## V1.0.0 -#### 持久化链路 -1. 客户端使用内存队列异步化批量上报服务器 -2. 服务器接收到请求后无脑写H2数据库 -3. 任务结束后,流式同步到MongoDB持久化存储,维护一个包含Array的MongoDB对象 -4. 同步结束后删除本地所有数据 -#### 查询链路 -* 如果本地存在数据,则直接从本地数据库返回 -* 如果本地不存在数据,则直连MongoDB,获取数据再返回 - -*** -问题主要在于前台展示:测试100W条数据,本地H2占用82M,MongoDB未知(因为mongo shell不知道为啥用不了...),不过应该也小不到哪里去。这种情况下数据都没办法回传回来...需要更新方案。 - -```text -org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe - at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:351) - at org.apache.catalina.connector.OutputBuffer.flushByteBuffer(OutputBuffer.java:776) - at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:681) - at org.apache.catalina.connector.OutputBuffer.writeBytes(OutputBuffer.java:386) - at org.apache.catalina.connector.OutputBuffer.write(OutputBuffer.java:364) - at org.apache.catalina.connector.CoyoteOutputStream.write(CoyoteOutputStream.java:96) - at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137) - at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1150) - at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:923) - at org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal(AbstractJackson2HttpMessageConverter.java:287) - at org.springframework.http.converter.AbstractGenericHttpMessageConverter.write(AbstractGenericHttpMessageConverter.java:104) - at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:287) - at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor.java:181) - at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82) - at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:123) - at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403) - at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61) - at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:141) - at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:80) - at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1300) - at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1111) - at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057) - at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) - at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) - at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) - at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) - at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) - at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) - at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) - at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) - at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) - at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) - at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) - at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) - at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) - at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) - at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) - at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373) - at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) - at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) - at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1594) - at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) - at java.lang.Thread.run(Thread.java:748) -Caused by: java.io.IOException: Broken pipe - at sun.nio.ch.FileDispatcherImpl.write0(Native Method) - at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) - at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) - at sun.nio.ch.IOUtil.write(IOUtil.java:65) - at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) - at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138) - at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101) - at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152) - at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253) - at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740) - at org.apache.tomcat.util.net.SocketWrapperBase.writeBlocking(SocketWrapperBase.java:560) - at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:504) - at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.doWrite(Http11OutputBuffer.java:538) - at org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java:110) - at org.apache.coyote.http11.Http11OutputBuffer.doWrite(Http11OutputBuffer.java:190) - at org.apache.coyote.Response.doWrite(Response.java:601) - at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:339) - ... 60 common frames omitted - -``` - -## V2.0.0 ->经过小小的调查,mongoDB似乎允许用户直接使用它的文件系统:GridFS,完成文件的存储。那么要不要改成文件对文件的形式呢?同步开始时,先在本地生成日志文件,然后同步到MongoDB。查询时则先下载文件。一旦拥有了完整的文件,分页什么的也就容易实现了,前端展示一次1000行之类的~ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index c05fe17d..90a9fd8c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -193,7 +193,7 @@ public class InstanceLogService { try { instanceId2LastReportTime.remove(instanceId); CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); - log.warn("[InstanceLog-{}] delete local instanceLog successfully.", instanceId); + log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId); }catch (Exception e) { log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } From ab0757a87eba7f63cb7dc40ee6fcad3ea4c1d1cc Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 21:05:41 +0800 Subject: [PATCH 17/20] fix: remove FrequentTaskTracker's heart beat,just using instanceTimeoutMS to avoid instance never stopped --- .../tracker/processor/ProcessorTracker.java | 2 +- .../tracker/task/FrequentTaskTracker.java | 29 ++++--------------- .../worker/core/tracker/task/TaskTracker.java | 12 ++++---- 3 files changed, 13 insertions(+), 30 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 38d4937f..b481e0c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -63,7 +63,7 @@ public class ProcessorTracker { private ThreadPoolExecutor threadPool; private ScheduledExecutorService timingPool; - private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100; + private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128; // 长时间空闲的 ProcessorTracker 会发起销毁请求 private static final long MAX_IDLE_TIME = 300000; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 5d43a969..27d3af71 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -105,14 +105,6 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); } - @Override - public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { - super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result); - // 更新 LastActiveTime - SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId); - timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime); - } - @Override public InstanceDetail fetchRunningStatus() { InstanceDetail detail = new InstanceDetail(); @@ -181,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker { if (maxInstanceNum > 0) { if (timeExpressionType == TimeExpressionType.FIX_RATE) { if (subInstanceId2TimeHolder.size() > maxInstanceNum) { - log.warn("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); + log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); return; } @@ -190,14 +182,14 @@ public class FrequentTaskTracker extends TaskTracker { // 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) if (!taskPersistenceService.save(newRootTask)) { - log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId); + log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId); processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); return; } // 生成记录信息(必须保证持久化成功才能生成该记录,否则会导致 LAUNCH_FAILED 错误) SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); - timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); + timeHolder.startTime = System.currentTimeMillis(); subInstanceId2TimeHolder.put(subInstanceId, timeHolder); dispatchTask(newRootTask, myAddress); @@ -208,7 +200,7 @@ public class FrequentTaskTracker extends TaskTracker { try { innerRun(); }catch (Exception e) { - log.error("[TaskTracker-{}] launch task failed.", instanceId, e); + log.error("[FQTaskTracker-{}] launch task failed.", instanceId, e); } } } @@ -218,8 +210,6 @@ public class FrequentTaskTracker extends TaskTracker { */ private class Checker implements Runnable { - private static final long HEARTBEAT_TIMEOUT_MS = 60000; - @Override public void run() { @@ -231,7 +221,7 @@ public class FrequentTaskTracker extends TaskTracker { checkStatus(); reportStatus(); }catch (Exception e) { - log.warn("[TaskTracker-{}] check and report status failed.", instanceId, e); + log.warn("[FQTaskTracker-{}] check and report status failed.", instanceId, e); } } @@ -249,7 +239,6 @@ public class FrequentTaskTracker extends TaskTracker { SubInstanceTimeHolder timeHolder = entry.getValue(); long executeTimeout = nowTS - timeHolder.startTime; - long heartbeatTimeout = nowTS - timeHolder.lastActiveTime; // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 if (executeTimeout > instanceTimeoutMS) { @@ -257,11 +246,6 @@ public class FrequentTaskTracker extends TaskTracker { continue; } - if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { - onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator); - continue; - } - // 查看执行情况 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); @@ -312,7 +296,7 @@ public class FrequentTaskTracker extends TaskTracker { } // 舍去一切重试机制,反正超时就失败 } - log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch); + log.debug("[FQTaskTracker-{}] check status using {}.", instanceId, stopwatch); } private void reportStatus() { @@ -387,7 +371,6 @@ public class FrequentTaskTracker extends TaskTracker { private static class SubInstanceTimeHolder { private long startTime; - private long lastActiveTime; } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 1974e0c7..bb9bb88f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -166,7 +166,7 @@ public abstract class TaskTracker { lastReportTime = taskOpt.get().getLastReportTime(); }else { // 理论上不存在这种情况,除非数据库异常 - log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", instanceId, instanceId, taskId); + log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId); } if (lastReportTime == null) { @@ -176,8 +176,8 @@ public abstract class TaskTracker { // 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时,时间不一致可能导致问题) if (lastReportTime > reportTime) { - log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", - instanceId, lastReportTime, reportTime, taskId, newStatus); + log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", + instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus); return; } @@ -215,7 +215,7 @@ public abstract class TaskTracker { boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); if (retryTask) { - log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId); + log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId); return; } } @@ -227,12 +227,12 @@ public abstract class TaskTracker { boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); if (!updateResult) { - log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); + log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId); } } catch (InterruptedException ignore) { } catch (Exception e) { - log.warn("[TaskTracker-{}] update task status failed.", instanceId, e); + log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e); } finally { segmentLock.unlock(lockId); } From d4e285bd9fcc2a5e5f681d5e0655dfce1030027d Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 21:15:05 +0800 Subject: [PATCH 18/20] feat: powerjob-server update to SpringBoot v2.3.4.RELEASE --- powerjob-server/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 672af6e2..6b72e369 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -15,7 +15,7 @@ 2.9.2 - 2.2.6.RELEASE + 2.3.4.RELEASE 3.3.0 8.0.19 From 7b54dccdd4caf3b0cb0120b59cac66705f16a284 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 21:21:35 +0800 Subject: [PATCH 19/20] optimzie: enlarge ThreadPool --- .../powerjob/server/common/config/ThreadPoolConfig.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index dca75fa7..5597eaf5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -27,8 +27,8 @@ public class ThreadPoolConfig { @Bean("omsTimingPool") public Executor getTimingPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 16); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 32); // use SynchronousQueue executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); @@ -44,8 +44,8 @@ public class ThreadPoolConfig { @Bean("omsBackgroundPool") public Executor initBackgroundPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16); executor.setQueueCapacity(8192); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsBackgroundPool-"); From 455d8b1e16f66607cfad824df7bd136b792a7912 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 21:31:51 +0800 Subject: [PATCH 20/20] feat: extend UserInfo to prepare for WebHook alarm support --- others/oms-sql.sql | 2 ++ .../powerjob/server/persistence/core/model/UserInfoDO.java | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/others/oms-sql.sql b/others/oms-sql.sql index 1a934017..cc029c33 100644 --- a/others/oms-sql.sql +++ b/others/oms-sql.sql @@ -153,6 +153,8 @@ CREATE TABLE `user_info` ( `password` varchar(255) DEFAULT NULL, `phone` varchar(255) DEFAULT NULL, `username` varchar(255) DEFAULT NULL, + `extra` varchar(255) DEFAULT NULL, + `web_hook` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java index f2a6170f..4eeda952 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java @@ -29,6 +29,11 @@ public class UserInfoDO { private String phone; // 邮箱地址 private String email; + // webHook + private String webHook; + + // 扩展字段 + private String extra; private Date gmtCreate; private Date gmtModified;