diff --git a/README.md b/README.md
index 658653a3..936894e6 100644
--- a/README.md
+++ b/README.md
@@ -47,7 +47,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
-| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** |
+| 报警监控 | 无 | 邮件 | 短信 | **邮件与钉钉,并支持开发者扩展** |
| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
diff --git a/others/oms-sql.sql b/others/oms-sql.sql
index 332cd8ee..1a934017 100644
--- a/others/oms-sql.sql
+++ b/others/oms-sql.sql
@@ -1,15 +1,17 @@
/*
Navicat Premium Data Transfer
+ Source Server : Local MySQL
Source Server Type : MySQL
- Source Server Version : 80020
- Source Schema : powerjob-product
+ Source Server Version : 80021
+ Source Host : localhost:3306
+ Source Schema : powerjob-daily
Target Server Type : MySQL
- Target Server Version : 80020
+ Target Server Version : 80021
File Encoding : 65001
- Date: 23/06/2020 22:30:06
+ Date: 08/10/2020 12:39:10
*/
SET NAMES utf8mb4;
@@ -28,7 +30,7 @@ CREATE TABLE `app_info` (
`password` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `appNameUK` (`app_name`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for container_info
@@ -62,10 +64,10 @@ CREATE TABLE `instance_info` (
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`instance_id` bigint DEFAULT NULL,
- `instance_params` text,
+ `instance_params` longtext,
`job_id` bigint DEFAULT NULL,
`last_report_time` bigint DEFAULT NULL,
- `result` text,
+ `result` longtext,
`running_times` bigint DEFAULT NULL,
`status` int DEFAULT NULL,
`task_tracker_address` varchar(255) DEFAULT NULL,
@@ -75,7 +77,7 @@ CREATE TABLE `instance_info` (
KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`),
KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`),
KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for job_info
@@ -101,7 +103,7 @@ CREATE TABLE `job_info` (
`min_memory_space` double NOT NULL,
`next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL,
- `processor_info` text,
+ `processor_info` longtext,
`processor_type` int DEFAULT NULL,
`status` int DEFAULT NULL,
`task_retry_num` int DEFAULT NULL,
@@ -109,7 +111,7 @@ CREATE TABLE `job_info` (
`time_expression_type` int DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for oms_lock
@@ -124,7 +126,7 @@ CREATE TABLE `oms_lock` (
`ownerip` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lockNameUK` (`lock_name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for server_info
@@ -166,7 +168,7 @@ CREATE TABLE `workflow_info` (
`max_wf_instance_num` int DEFAULT NULL,
`next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL,
- `pedag` text,
+ `pedag` longtext,
`status` int DEFAULT NULL,
`time_expression` varchar(255) DEFAULT NULL,
`time_expression_type` int DEFAULT NULL,
@@ -174,7 +176,7 @@ CREATE TABLE `workflow_info` (
`wf_name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for workflow_instance_info
@@ -184,15 +186,16 @@ CREATE TABLE `workflow_instance_info` (
`id` bigint NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint DEFAULT NULL,
`app_id` bigint DEFAULT NULL,
- `dag` text,
+ `dag` longtext,
`finished_time` bigint DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
- `result` text,
+ `result` longtext,
`status` int DEFAULT NULL,
+ `wf_init_params` longtext,
`wf_instance_id` bigint DEFAULT NULL,
`workflow_id` bigint DEFAULT NULL,
PRIMARY KEY (`id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index d1e080d8..76a28752 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-client
- 3.2.3
+ 3.3.0
jar
- 3.2.3
+ 3.3.0
5.6.1
diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
index 5a573cbb..a2f91a13 100644
--- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
+++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.client;
import com.github.kfcfans.powerjob.common.InstanceStatus;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
@@ -68,7 +68,7 @@ public class OhMyClient {
currentAddress = addr;
break;
}else {
- throw new OmsException(resultDTO.getMessage());
+ throw new PowerJobException(resultDTO.getMessage());
}
}
}catch (IOException ignore) {
@@ -76,7 +76,7 @@ public class OhMyClient {
}
if (StringUtils.isEmpty(currentAddress)) {
- throw new OmsException("no server available");
+ throw new PowerJobException("no server available");
}
log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
}
@@ -108,7 +108,7 @@ public class OhMyClient {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
- String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType));
+ String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class);
}
@@ -283,7 +283,7 @@ public class OhMyClient {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
- String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType));
+ String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class);
}
@@ -349,17 +349,26 @@ public class OhMyClient {
/**
* 运行工作流
- * @param workflowId workflowId
+ * @param workflowId 工作流ID
+ * @param initParams 启动参数
+ * @param delayMS 延迟时间,单位毫秒 ms
* @return 工作流实例ID
- * @throws Exception 异常
+ * @throws Exception 异常信息
*/
- public ResultDTO runWorkflow(Long workflowId) throws Exception {
+ public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception {
FormBody.Builder builder = new FormBody.Builder()
.add("workflowId", workflowId.toString())
- .add("appId", appId.toString());
+ .add("appId", appId.toString())
+ .add("delay", String.valueOf(delayMS));
+ if (StringUtils.isNotEmpty(initParams)) {
+ builder.add("initParams", initParams);
+ }
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
return JsonUtils.parseObject(post, ResultDTO.class);
}
+ public ResultDTO runWorkflow(Long workflowId) throws Exception {
+ return runWorkflow(workflowId, null, 0);
+ }
/* ************* Workflow Instance 区 ************* */
/**
@@ -426,6 +435,6 @@ public class OhMyClient {
}
log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
- throw new OmsException("no server available when send post");
+ throw new PowerJobException("no server available when send post");
}
}
diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java
index 3b5f2f75..0e888742 100644
--- a/powerjob-client/src/test/java/TestWorkflow.java
+++ b/powerjob-client/src/test/java/TestWorkflow.java
@@ -1,7 +1,11 @@
import com.github.kfcfans.powerjob.client.OhMyClient;
+import com.github.kfcfans.powerjob.common.ExecuteType;
+import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
+import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
+import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -20,7 +24,23 @@ public class TestWorkflow {
@BeforeAll
public static void initClient() throws Exception {
- ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null);
+ ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
+ }
+
+ @Test
+ public void initTestData() throws Exception {
+ SaveJobInfoRequest base = new SaveJobInfoRequest();
+ base.setJobName("DAG-Node-");
+ base.setTimeExpressionType(TimeExpressionType.WORKFLOW);
+ base.setExecuteType(ExecuteType.STANDALONE);
+ base.setProcessorType(ProcessorType.EMBEDDED_JAVA);
+ base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor");
+
+ for (int i = 0; i < 5; i++) {
+ SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class);
+ request.setJobName(request.getJobName() + i);
+ System.out.println(ohMyClient.saveJob(request));
+ }
}
@Test
@@ -30,8 +50,8 @@ public class TestWorkflow {
List nodes = Lists.newLinkedList();
List edges = Lists.newLinkedList();
- nodes.add(new PEWorkflowDAG.Node(1L, "node-1"));
- nodes.add(new PEWorkflowDAG.Node(2L, "node-2"));
+ nodes.add(new PEWorkflowDAG.Node(1L, "DAG-Node-1"));
+ nodes.add(new PEWorkflowDAG.Node(2L, "DAG-Node-2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
@@ -81,4 +101,9 @@ public class TestWorkflow {
public void testFetchWfInstanceInfo() throws Exception {
System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L));
}
+
+ @Test
+ public void testRunWorkflowPlus() throws Exception {
+ System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000));
+ }
}
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index 8aec6af7..947b04b9 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -10,7 +10,7 @@
4.0.0
powerjob-common
- 3.2.3
+ 3.3.0
jar
@@ -18,7 +18,7 @@
3.10
2.6
29.0-jre
- 4.4.1
+ 3.14.9
2.6.4
5.6.1
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java
deleted file mode 100644
index 3d7db8c3..00000000
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.github.kfcfans.powerjob.common;
-
-/**
- * OhMyScheduler 运行时异常
- *
- * @author tjq
- * @since 2020/5/26
- */
-public class OmsException extends RuntimeException {
-
- public OmsException() {
- }
-
- public OmsException(String message) {
- super(message);
- }
-
- public OmsException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public OmsException(Throwable cause) {
- super(cause);
- }
-
- public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java
new file mode 100644
index 00000000..5d8aba04
--- /dev/null
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobException.java
@@ -0,0 +1,29 @@
+package com.github.kfcfans.powerjob.common;
+
+/**
+ * PowerJob 运行时异常
+ *
+ * @author tjq
+ * @since 2020/5/26
+ */
+public class PowerJobException extends RuntimeException {
+
+ public PowerJobException() {
+ }
+
+ public PowerJobException(String message) {
+ super(message);
+ }
+
+ public PowerJobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PowerJobException(Throwable cause) {
+ super(cause);
+ }
+
+ public PowerJobException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java
index 3f26e896..c95f25df 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/InstanceInfoDTO.java
@@ -20,12 +20,16 @@ public class InstanceInfoDTO {
private Long appId;
// 任务实例ID
private Long instanceId;
+ // 工作流实例ID
+ private Long wfInstanceId;
// 任务实例参数
private String instanceParams;
/**
* 任务状态 {@link InstanceStatus}
*/
private int status;
+ // 该任务实例的类型,普通/工作流(InstanceType)
+ private Integer type;
// 执行结果
private String result;
// 预计触发时间
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java
index 08da3c00..78c1973a 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.common.utils;
import com.github.kfcfans.powerjob.common.OmsConstant;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
@@ -121,11 +121,11 @@ public class CommonUtils {
public static T requireNonNull(T obj, String msg) {
if (obj == null) {
- throw new OmsException(msg);
+ throw new PowerJobException(msg);
}
if (obj instanceof String) {
if (StringUtils.isEmpty((String) obj)) {
- throw new OmsException(msg);
+ throw new PowerJobException(msg);
}
}
return obj;
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java
index 4cf47bfc..3d0a61ae 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java
@@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
@@ -54,6 +54,6 @@ public class JsonUtils {
}catch (Exception e) {
ExceptionUtils.rethrow(e);
}
- throw new OmsException("impossible");
+ throw new PowerJobException("impossible");
}
}
diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index 9b2b2378..672af6e2 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-server
- 3.2.3
+ 3.3.0
jar
2.9.2
2.2.6.RELEASE
- 3.2.3
+ 3.3.0
8.0.19
19.7.0.0
@@ -57,6 +57,11 @@
ojdbc8
${ojdbc.version}
+
+ com.oracle.database.nls
+ orai18n
+ ${ojdbc.version}
+
com.microsoft.sqlserver
@@ -166,6 +171,12 @@
com.aliyun
alibaba-dingtalk-service-sdk
${dingding.version}
+
+
+ log4j
+ log4j
+
+
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java
index 366800d6..6196d1cb 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java
@@ -7,7 +7,7 @@ import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request;
import com.dingtalk.api.request.OapiUserGetByMobileRequest;
import com.dingtalk.api.response.OapiGettokenResponse;
import com.dingtalk.api.response.OapiUserGetByMobileResponse;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -52,7 +52,7 @@ public class DingTalkUtils implements Closeable {
refreshAccessToken(appKey, appSecret);
if (StringUtils.isEmpty(accessToken)) {
- throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret");
+ throw new PowerJobException("fetch AccessToken failed, please check your appKey & appSecret");
}
scheduledPool = Executors.newSingleThreadScheduledExecutor();
@@ -91,7 +91,7 @@ public class DingTalkUtils implements Closeable {
return execute.getUserid();
}
log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg());
- throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
+ throw new PowerJobException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
}
public void sendMarkdownAsync(String title, List entities, String userList, Long agentId) throws Exception {
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java
index e96a913a..2a98e9f9 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils;
import com.github.kfcfans.powerjob.common.InstanceStatus;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.model.WorkflowDAG;
@@ -76,7 +76,7 @@ public class WorkflowDAGUtils {
Map id2Node = Maps.newHashMap();
if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) {
- throw new OmsException("empty graph");
+ throw new PowerJobException("empty graph");
}
// 创建节点
@@ -95,7 +95,7 @@ public class WorkflowDAGUtils {
WorkflowDAG.Node to = id2Node.get(edge.getTo());
if (from == null || to == null) {
- throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge));
+ throw new PowerJobException("Illegal Edge: " + JsonUtils.toJSONString(edge));
}
from.getSuccessors().add(to);
@@ -106,7 +106,7 @@ public class WorkflowDAGUtils {
// 合法性校验(至少存在一个顶点)
if (rootIds.size() < 1) {
- throw new OmsException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG));
+ throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG));
}
List roots = Lists.newLinkedList();
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java
index 5eacf323..c2a074a4 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java
@@ -36,6 +36,11 @@ public class WorkflowInstanceInfoDO {
// workflow 状态(WorkflowInstanceStatus)
private Integer status;
+ // 工作流启动参数
+ @Lob
+ @Column
+ private String wfInitParams;
+
@Lob
@Column
private String dag;
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java
index 2b0ec22a..2f8b0bb3 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/AppInfoService.java
@@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.service;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import org.springframework.stereotype.Service;
@@ -28,11 +28,11 @@ public class AppInfoService {
*/
public Long assertApp(String appName, String password) {
- AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName));
+ AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId();
}
- throw new OmsException("password error!");
+ throw new PowerJobException("password error!");
}
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
index 9b3810e8..ad81184f 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
@@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.InstanceStatus;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
@@ -70,7 +71,7 @@ public class JobService {
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) {
- jobInfoDO.setMaxInstanceNum(0);
+ jobInfoDO.setMaxWorkerCount(0);
}
// 转化报警用户列表
@@ -78,7 +79,7 @@ public class JobService {
jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds()));
}
- refreshJob(jobInfoDO);
+ calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date());
}
@@ -143,7 +144,7 @@ public class JobService {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
- refreshJob(jobInfoDO);
+ calculateNextTriggerTime(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO);
}
@@ -184,7 +185,7 @@ public class JobService {
});
}
- private void refreshJob(JobInfoDO jobInfoDO) throws Exception {
+ private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception {
// 计算下次调度时间
Date now = new Date();
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
@@ -192,6 +193,9 @@ public class JobService {
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
+ if (nextValidTime == null) {
+ throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression());
+ }
jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) {
jobInfoDO.setTimeExpression(null);
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java
index eedbf5ac..aa01d08f 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.service.alarm.impl;
import com.github.kfcfans.powerjob.common.OmsConstant;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ;
@@ -55,7 +55,7 @@ public class DingTalkAlarmService implements Alarmable {
String userId = mobile2UserIdCache.get(user.getPhone(), () -> {
try {
return dingTalkUtils.fetchUserIdByMobile(user.getPhone());
- } catch (OmsException ignore) {
+ } catch (PowerJobException ignore) {
return EMPTY_TAG;
} catch (Exception ignore) {
return null;
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java
index e9037176..e82760b8 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java
@@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.ha;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.Ping;
@@ -56,7 +56,7 @@ public class ServerSelectService {
// 无锁获取当前数据库中的Server
Optional appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
- throw new OmsException(appId + " is not registered!");
+ throw new PowerJobException(appId + " is not registered!");
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java
index a9ebc414..bab39209 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java
@@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
@@ -135,11 +135,11 @@ public class InstanceService {
public void retryInstance(Long instanceId) {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) {
- throw new OmsException("Only stopped instance can be retry!");
+ throw new PowerJobException("Only stopped instance can be retry!");
}
// 暂时不支持工作流任务的重试
if (instanceInfo.getWfInstanceId() != null) {
- throw new OmsException("Workflow's instance do not support retry!");
+ throw new PowerJobException("Workflow's instance do not support retry!");
}
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
@@ -152,7 +152,7 @@ public class InstanceService {
// 派发任务
Long jobId = instanceInfo.getJobId();
- JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new OmsException("can't find job info by jobId: " + jobId));
+ JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}
@@ -187,7 +187,7 @@ public class InstanceService {
log.info("[Instance-{}] cancel the instance successfully.", instanceId);
}else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId);
- throw new OmsException("instance already up and running");
+ throw new PowerJobException("instance already up and running");
}
}catch (Exception e) {
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java
index b846ccbc..489f2874 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java
@@ -171,7 +171,7 @@ public class InstanceStatusCheckService {
waitingWfInstanceList.forEach(wfInstance -> {
Optional workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId());
workflowOpt.ifPresent(workflowInfo -> {
- workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId());
+ workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams());
log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId());
});
});
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
index 6e09a62a..2115a664 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
@@ -121,8 +121,6 @@ public class OmsScheduleService {
*/
private void scheduleCronJob(List appIds) {
-
- Date now = new Date();
long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
@@ -165,24 +163,13 @@ public class OmsScheduleService {
});
// 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)
- List updatedJobInfos = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> {
-
try {
-
- Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression());
-
- JobInfoDO updatedJobInfo = new JobInfoDO();
- BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
- updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
- updatedJobInfo.setGmtModified(now);
-
- updatedJobInfos.add(updatedJobInfo);
+ refreshJob(jobInfoDO);
} catch (Exception e) {
- log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e);
+ log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
}
});
- jobInfoRepository.saveAll(updatedJobInfos);
jobInfoRepository.flush();
@@ -203,11 +190,10 @@ public class OmsScheduleService {
return;
}
- Date now = new Date();
wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录,防止不调度的情况发生
- Long wfInstanceId = workflowInstanceManager.create(wfInfo);
+ Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
// 2. 推入时间轮,准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();
@@ -215,20 +201,13 @@ public class OmsScheduleService {
log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis());
delay = 0;
}
- InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
+ InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null));
// 3. 重新计算下一次调度时间并更新
try {
- Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
-
- WorkflowInfoDO updateEntity = new WorkflowInfoDO();
- BeanUtils.copyProperties(wfInfo, updateEntity);
-
- updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
- updateEntity.setGmtModified(now);
- workflowInfoRepository.save(updateEntity);
+ refreshWorkflow(wfInfo);
}catch (Exception e) {
- log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e);
+ log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e);
}
});
workflowInfoRepository.flush();
@@ -264,6 +243,40 @@ public class OmsScheduleService {
});
}
+ private void refreshJob(JobInfoDO jobInfo) throws Exception {
+ Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression());
+
+ JobInfoDO updatedJobInfo = new JobInfoDO();
+ BeanUtils.copyProperties(jobInfo, updatedJobInfo);
+
+ if (nextTriggerTime == null) {
+ log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId());
+ updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV());
+ }else {
+ updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
+ }
+ updatedJobInfo.setGmtModified(new Date());
+
+ jobInfoRepository.save(updatedJobInfo);
+ }
+
+ private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception {
+ Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
+
+ WorkflowInfoDO updateEntity = new WorkflowInfoDO();
+ BeanUtils.copyProperties(wfInfo, updateEntity);
+
+ if (nextTriggerTime == null) {
+ log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId());
+ wfInfo.setStatus(SwitchableStatus.DISABLE.getV());
+ }else {
+ updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
+ }
+
+ updateEntity.setGmtModified(new Date());
+ workflowInfoRepository.save(updateEntity);
+ }
+
/**
* 计算下次触发时间
* @param preTriggerTime 前一次触发时间
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java
index d9e7fab2..37ca9396 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java
@@ -67,9 +67,10 @@ public class WorkflowInstanceManager {
/**
* 创建工作流任务实例
* @param wfInfo 工作流任务元数据(描述信息)
+ * @param initParams 启动参数
* @return wfInstanceId
*/
- public Long create(WorkflowInfoDO wfInfo) {
+ public Long create(WorkflowInfoDO wfInfo, String initParams) {
Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate();
@@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
newWfInstance.setWorkflowId(wfId);
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
+ newWfInstance.setWfInitParams(initParams);
newWfInstance.setGmtCreate(now);
newWfInstance.setGmtModified(now);
@@ -107,8 +109,9 @@ public class WorkflowInstanceManager {
* 开始任务
* @param wfInfo 工作流任务信息
* @param wfInstanceId 工作流任务实例ID
+ * @param initParams 启动参数
*/
- public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
+ public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) {
Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
if (!wfInstanceInfoOpt.isPresent()) {
@@ -132,13 +135,19 @@ public class WorkflowInstanceManager {
try {
+ // 构建根任务启动参数(为了精简 worker 端实现,启动参数仍以 instanceParams 字段承接)
+ Map preJobId2Result = Maps.newHashMap();
+ // 模拟 preJobId -> preJobResult 的格式,-1 代表前置任务不存在
+ preJobId2Result.put("-1", initParams);
+ String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result);
+
PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
List roots = WorkflowDAGUtils.listRoots(peWorkflowDAG);
peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()));
// 创建所有的根任务
roots.forEach(root -> {
- Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis());
+ Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV());
@@ -152,7 +161,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务
- roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null));
+ roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams));
}catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java
index 08a20494..3be885c4 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java
@@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
@@ -43,7 +43,7 @@ public class WorkflowInstanceService {
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
- throw new OmsException("workflow instance already stopped");
+ throw new PowerJobException("workflow instance already stopped");
}
// 停止所有已启动且未完成的服务
PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
@@ -80,7 +80,7 @@ public class WorkflowInstanceService {
public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId));
if (!Objects.equals(appId, wfInstance.getAppId())) {
- throw new OmsException("Permission Denied!");
+ throw new PowerJobException("Permission Denied!");
}
return wfInstance;
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
index 63e224b5..073d7491 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO;
@@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
+import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@@ -42,7 +43,7 @@ public class WorkflowService {
req.valid();
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
- throw new OmsException("illegal DAG");
+ throw new PowerJobException("illegal DAG");
}
Long wfId = req.getId();
@@ -130,22 +131,27 @@ public class WorkflowService {
* 立即运行工作流
* @param wfId 工作流ID
* @param appId 所属应用ID
+ * @param initParams 启动参数
+ * @param delay 延迟时间
* @return 该 workflow 实例的 instanceId(wfInstanceId)
*/
- public Long runWorkflow(Long wfId, Long appId) {
+ public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
- Long wfInstanceId = workflowInstanceManager.create(wfInfo);
+ Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
- // 正式启动任务
- workflowInstanceManager.start(wfInfo, wfInstanceId);
+ if (delay <= 0) {
+ workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
+ }else {
+ InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams));
+ }
return wfInstanceId;
}
private WorkflowInfoDO permissionCheck(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId));
if (!wfInfo.getAppId().equals(appId)) {
- throw new OmsException("Permission Denied!can't delete other appId's workflow!");
+ throw new PowerJobException("Permission Denied!can't delete other appId's workflow!");
}
return wfInfo;
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java
index 3ec9821d..54c3e75e 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/ControllerExceptionHandler.java
@@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.web;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.HttpMessageNotReadableException;
@@ -25,7 +25,7 @@ public class ControllerExceptionHandler {
public ResultDTO exceptionHandler(Exception e) {
// 不是所有异常都需要打印完整堆栈,后续可以定义内部的Exception,便于判断
- if (e instanceof IllegalArgumentException || e instanceof OmsException) {
+ if (e instanceof IllegalArgumentException || e instanceof PowerJobException) {
log.warn("[ControllerException] http request failed, message is {}.", e.getMessage());
} else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) {
log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage());
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java
index 6f6f389c..300b55db 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java
@@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
@@ -145,10 +146,10 @@ public class InstanceController {
private String getTargetServer(Long instanceId) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) {
- throw new RuntimeException("invalid instanceId: " + instanceId);
+ throw new PowerJobException("invalid instanceId: " + instanceId);
}
Optional appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId());
- return appInfoOpt.orElseThrow(() -> new RuntimeException("impossible")).getCurrentServer();
+ return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer();
}
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java
index 40fd0b6a..a846f73c 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java
@@ -150,8 +150,8 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.RUN_WORKFLOW)
- public ResultDTO runWorkflow(Long workflowId, Long appId) {
- return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
+ public ResultDTO runWorkflow(Long workflowId, Long appId, @RequestParam(required = false) String initParams, @RequestParam(required = false) Long delay) {
+ return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay == null ? 0 : delay));
}
/* ************* Workflow Instance 区 ************* */
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java
index 25095922..9eb1b589 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowController.java
@@ -79,7 +79,7 @@ public class WorkflowController {
@GetMapping("/run")
public ResultDTO runWorkflow(Long workflowId, Long appId) {
- return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
+ return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0));
}
private static PageResult convertPage(Page originPage) {
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java
index 77ea5da3..bd007626 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/ModifyAppInfoRequest.java
@@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.web.request;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@@ -21,7 +21,7 @@ public class ModifyAppInfoRequest {
public void valid() {
CommonUtils.requireNonNull(appName, "appName can't be empty");
if (StringUtils.containsWhitespace(appName)) {
- throw new OmsException("appName can't contains white space!");
+ throw new PowerJobException("appName can't contains white space!");
}
}
}
diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties
index 5eca462b..6d83511f 100644
--- a/powerjob-server/src/main/resources/application-daily.properties
+++ b/powerjob-server/src/main/resources/application-daily.properties
@@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
-spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
+spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
diff --git a/powerjob-server/src/main/resources/banner.txt b/powerjob-server/src/main/resources/banner.txt
index dffacc32..0d73c6d3 100644
--- a/powerjob-server/src/main/resources/banner.txt
+++ b/powerjob-server/src/main/resources/banner.txt
@@ -8,6 +8,7 @@ ${AnsiColor.GREEN}
░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████
░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░
${AnsiColor.BRIGHT_RED}
-* Maintainer: tengjiqi@gmail.com
+* Maintainer: tengjiqi@gmail.com & PowerJob-Team
+* OfficialWebsite: http://www.powerjob.tech/
* SourceCode: https://github.com/KFCFans/PowerJob
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)
diff --git a/powerjob-server/src/main/resources/logback-dev.xml b/powerjob-server/src/main/resources/logback-dev.xml
index 94e07a92..ec9b4011 100644
--- a/powerjob-server/src/main/resources/logback-dev.xml
+++ b/powerjob-server/src/main/resources/logback-dev.xml
@@ -10,7 +10,7 @@
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
+ value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{20}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
diff --git a/powerjob-server/src/main/resources/logback-product.xml b/powerjob-server/src/main/resources/logback-product.xml
index b14658f3..9f63badf 100644
--- a/powerjob-server/src/main/resources/logback-product.xml
+++ b/powerjob-server/src/main/resources/logback-product.xml
@@ -8,6 +8,13 @@
-->
+
+
+ ${CONSOLE_LOG_PATTERN}
+ utf8
+
+
+
${LOG_PATH}/powerjob-server-error.log
@@ -16,7 +23,7 @@
7
- %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n
UTF-8
@@ -53,7 +60,7 @@
7
- %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n
UTF-8
true
@@ -61,6 +68,7 @@
+
diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java
new file mode 100644
index 00000000..98316099
--- /dev/null
+++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/CronTest.java
@@ -0,0 +1,26 @@
+package com.github.kfcfans.powerjob.server.test;
+
+
+import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
+import org.junit.Test;
+
+import java.util.Date;
+
+/**
+ * CRON 测试
+ *
+ * @author tjq
+ * @since 2020/10/8
+ */
+public class CronTest {
+
+ private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020";
+
+ @Test
+ public void testFixedTimeCron() throws Exception {
+ CronExpression cronExpression = new CronExpression(FIXED_CRON);
+ System.out.println(cronExpression.getCronExpression());
+ System.out.println(cronExpression.getNextValidTimeAfter(new Date()));
+ }
+
+}
diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml
index 68ee9f0c..d95e203f 100644
--- a/powerjob-worker-agent/pom.xml
+++ b/powerjob-worker-agent/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker-agent
- 3.2.3
+ 3.3.0
jar
- 3.2.3
+ 3.3.0
1.2.3
4.3.2
diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml
index 80a5d7e2..86835a94 100644
--- a/powerjob-worker-samples/pom.xml
+++ b/powerjob-worker-samples/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-worker-samples
- 3.2.3
+ 3.3.0
2.2.6.RELEASE
- 3.2.3
+ 3.3.0
1.2.68
diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties
index ef35ebd2..17bfa59f 100644
--- a/powerjob-worker-samples/src/main/resources/application.properties
+++ b/powerjob-worker-samples/src/main/resources/application.properties
@@ -2,12 +2,14 @@ server.port=8081
spring.jpa.open-in-view=false
-########### powerjob-worker 配置 ###########
+########### powerjob-worker 配置(老配置 powerjob.xxx 即将废弃,请使用 powerjob.worker.xxx) ###########
# akka 工作端口,可选,默认 27777
-powerjob.akka-port=27777
+powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
-powerjob.app-name=powerjob-agent-test
+powerjob.worker.app-name=powerjob-agent-test
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
-powerjob.server-address=127.0.0.1:7700,127.0.0.1:7701
+powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 持久化方式,可选,默认 disk
-powerjob.store-strategy=disk
\ No newline at end of file
+powerjob.worker.store-strategy=disk
+# 返回值最大长度,默认 8096
+powerjob.worker.max-result-length=4096
\ No newline at end of file
diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml
index 60a42a73..d5ba6e8e 100644
--- a/powerjob-worker-spring-boot-starter/pom.xml
+++ b/powerjob-worker-spring-boot-starter/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-worker-spring-boot-starter
- 3.2.3
+ 3.3.0
jar
- 3.2.3
+ 3.3.0
2.2.6.RELEASE
diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java
index 6f6000c4..31615b54 100644
--- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java
+++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java
@@ -3,9 +3,12 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
+import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
@@ -19,32 +22,53 @@ import java.util.List;
*/
@Configuration
@EnableConfigurationProperties(PowerJobProperties.class)
+@Conditional(PowerJobAutoConfiguration.PowerJobWorkerCondition.class)
public class PowerJobAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public OhMyWorker initPowerJob(PowerJobProperties properties) {
+ PowerJobProperties.Worker worker = properties.getWorker();
+
// 服务器HTTP地址(端口号为 server.port,而不是 ActorSystem port),请勿添加任何前缀(http://)
- CommonUtils.requireNonNull(properties.getServerAddress(), "serverAddress can't be empty!");
- List serverAddress = Arrays.asList(properties.getServerAddress().split(","));
+ CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty!");
+ List serverAddress = Arrays.asList(worker.getServerAddress().split(","));
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
// 可以不显式设置,默认值 27777
- config.setPort(properties.getAkkaPort());
+ config.setPort(worker.getAkkaPort());
// appName,需要提前在控制台注册,否则启动报错
- config.setAppName(properties.getAppName());
+ config.setAppName(worker.getAppName());
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算
// 有大型 Map/MapReduce 需求,可能产生大量子任务(Task)的场景,请使用 DISK,否则妥妥的 OutOfMemory
- config.setStoreStrategy(properties.getStoreStrategy());
+ config.setStoreStrategy(worker.getStoreStrategy());
// 启动测试模式,true情况下,不再尝试连接 server 并验证appName
- config.setEnableTestMode(properties.isEnableTestMode());
+ config.setEnableTestMode(worker.isEnableTestMode());
// 2. 创建 Worker 对象,设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
}
+
+ static class PowerJobWorkerCondition extends AnyNestedCondition {
+
+ public PowerJobWorkerCondition() {
+ super(ConfigurationPhase.PARSE_CONFIGURATION);
+ }
+
+ @Deprecated
+ @ConditionalOnProperty(prefix = "powerjob", name = "server-address")
+ static class PowerJobProperty {
+
+ }
+
+ @ConditionalOnProperty(prefix = "powerjob.worker", name = "server-address")
+ static class PowerJobWorkerProperty {
+
+ }
+ }
}
diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java
index b8cf30b9..480e9e8d 100644
--- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java
+++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java
@@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
-import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
/**
* PowerJob 配置项
@@ -12,33 +14,114 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
* @author songyinyin
* @since 2020/7/26 16:37
*/
-@Data
@ConfigurationProperties(prefix = "powerjob")
public class PowerJobProperties {
+
+ private final Worker worker = new Worker();
+
+ public Worker getWorker() {
+ return worker;
+ }
+
+ @Deprecated
+ @DeprecatedConfigurationProperty(replacement = "powerjob.worker.app-name")
+ public String getAppName() {
+ return getWorker().appName;
+ }
+
+ @Deprecated
+ public void setAppName(String appName) {
+ getWorker().setAppName(appName);
+ }
+
+ @Deprecated
+ @DeprecatedConfigurationProperty(replacement = "powerjob.worker.akka-port")
+ public int getAkkaPort() {
+ return getWorker().akkaPort;
+ }
+
+ @Deprecated
+ public void setAkkaPort(int akkaPort) {
+ getWorker().setAkkaPort(akkaPort);
+ }
+
+ @Deprecated
+ @DeprecatedConfigurationProperty(replacement = "powerjob.worker.server-address")
+ public String getServerAddress() {
+ return getWorker().serverAddress;
+ }
+
+ @Deprecated
+ public void setServerAddress(String serverAddress) {
+ getWorker().setServerAddress(serverAddress);
+ }
+
+ @Deprecated
+ @DeprecatedConfigurationProperty(replacement = "powerjob.worker.store-strategy")
+ public StoreStrategy getStoreStrategy() {
+ return getWorker().storeStrategy;
+ }
+
+ @Deprecated
+ public void setStoreStrategy(StoreStrategy storeStrategy) {
+ getWorker().setStoreStrategy(storeStrategy);
+ }
+
+ @Deprecated
+ @DeprecatedConfigurationProperty(replacement = "powerjob.worker.max-result-length")
+ public int getMaxResultLength() {
+ return getWorker().maxResultLength;
+ }
+
+ @Deprecated
+ public void setMaxResultLength(int maxResultLength) {
+ getWorker().setMaxResultLength(maxResultLength);
+ }
+
+ @Deprecated
+ @DeprecatedConfigurationProperty(replacement = "powerjob.worker.enable-test-mode")
+ public boolean isEnableTestMode() {
+ return getWorker().enableTestMode;
+ }
+
+ @Deprecated
+ public void setEnableTestMode(boolean enableTestMode) {
+ getWorker().setEnableTestMode(enableTestMode);
+ }
+
+
+
/**
- * 应用名称,需要提前在控制台注册,否则启动报错
+ * 客户端 配置项
*/
- private String appName;
- /**
- * 启动 akka 端口
- */
- private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
- /**
- * 调度服务器地址,ip:port 或 域名,多个用英文逗号分隔
- */
- private String serverAddress;
- /**
- * 本地持久化方式,默认使用磁盘
- */
- private StoreStrategy storeStrategy = StoreStrategy.DISK;
- /**
- * 最大返回值长度,超过会被截断
- * {@link ProcessResult}#msg 的最大长度
- */
- private int maxResultLength = 8096;
- /**
- * 启动测试模式,true情况下,不再尝试连接 server 并验证appName。
- * true -> 用于本地写单元测试调试; false -> 默认值,标准模式
- */
- private boolean enableTestMode = false;
+ @Setter
+ @Getter
+ public static class Worker {
+ /**
+ * 应用名称,需要提前在控制台注册,否则启动报错
+ */
+ private String appName;
+ /**
+ * 启动 akka 端口
+ */
+ private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
+ /**
+ * 调度服务器地址,ip:port 或 域名,多个用英文逗号分隔
+ */
+ private String serverAddress;
+ /**
+ * 本地持久化方式,默认使用磁盘
+ */
+ private StoreStrategy storeStrategy = StoreStrategy.DISK;
+ /**
+ * 最大返回值长度,超过会被截断
+ * {@link ProcessResult}#msg 的最大长度
+ */
+ private int maxResultLength = 8096;
+ /**
+ * 启动测试模式,true情况下,不再尝试连接 server 并验证appName。
+ * true -> 用于本地写单元测试调试; false -> 默认值,标准模式
+ */
+ private boolean enableTestMode = false;
+ }
}
diff --git a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json
index 1fbedb0a..5fe74cec 100644
--- a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json
+++ b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json
@@ -4,46 +4,106 @@
"name": "powerjob",
"type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
+ },
+ {
+ "name": "powerjob.worker",
+ "type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "sourceMethod": "getWorker()"
}
],
"properties": [
{
- "name": "powerjob.app-name",
- "type": "java.lang.String",
- "description": "应用名称,需要提前在控制台注册,否则启动报错",
- "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
+ "name": "powerjob.worker.akka-port",
+ "type": "java.lang.Integer",
+ "description": "启动 akka 端口",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
- "name": "powerjob.max-result-length",
+ "name": "powerjob.worker.app-name",
+ "type": "java.lang.String",
+ "description": "应用名称,需要提前在控制台注册,否则启动报错",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
+ },
+ {
+ "name": "powerjob.worker.enable-test-mode",
+ "type": "java.lang.Boolean",
+ "description": "启动测试模式,true情况下,不再尝试连接 server 并验证appName。 true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
+ "defaultValue": false
+ },
+ {
+ "name": "powerjob.worker.max-result-length",
"type": "java.lang.Integer",
"description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度",
- "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"defaultValue": 8096
},
+ {
+ "name": "powerjob.worker.server-address",
+ "type": "java.lang.String",
+ "description": "调度服务器地址,ip:port 或 域名,多个用英文逗号分隔",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
+ },
+ {
+ "name": "powerjob.worker.store-strategy",
+ "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
+ "description": "本地持久化方式,默认使用磁盘",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
+ },
{
"name": "powerjob.akka-port",
"type": "java.lang.Integer",
- "description": "启动 akka 端口",
- "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "deprecated": true,
+ "deprecation": {
+ "replacement": "powerjob.worker.akka-port"
+ }
},
{
- "name": "powerjob.server-address",
+ "name": "powerjob.app-name",
"type": "java.lang.String",
- "description": "调度服务器地址,ip:port 或 域名,多值用英文逗号分隔",
- "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
- },
- {
- "name": "powerjob.store-strategy",
- "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
- "description": "本地持久化方式,默认使用磁盘",
- "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "deprecated": true,
+ "deprecation": {
+ "replacement": "powerjob.worker.app-name"
+ }
},
{
"name": "powerjob.enable-test-mode",
"type": "java.lang.Boolean",
- "description": "启动测试模式,true情况下,不再尝试连接 server 并验证appName。true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
- "defaultValue": false
+ "deprecated": true,
+ "deprecation": {
+ "replacement": "powerjob.worker.enable-test-mode"
+ }
+ },
+ {
+ "name": "powerjob.max-result-length",
+ "type": "java.lang.Integer",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "deprecated": true,
+ "deprecation": {
+ "replacement": "powerjob.worker.max-result-length"
+ }
+ },
+ {
+ "name": "powerjob.server-address",
+ "type": "java.lang.String",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "deprecated": true,
+ "deprecation": {
+ "replacement": "powerjob.worker.server-address"
+ }
+ },
+ {
+ "name": "powerjob.store-strategy",
+ "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
+ "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
+ "deprecated": true,
+ "deprecation": {
+ "replacement": "powerjob.worker.store-strategy"
+ }
}
],
"hints": []
diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index e0ac3936..44d95e5d 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker
- 3.2.3
+ 3.3.0
jar
5.2.4.RELEASE
- 3.2.3
+ 3.3.0
1.4.200
3.4.2
5.6.1
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java
index 3a78f2f9..1cde2378 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java
@@ -5,7 +5,7 @@ import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
@@ -163,16 +163,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
return appId;
}else {
log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
- throw new OmsException(resultDTO.getMessage());
+ throw new PowerJobException(resultDTO.getMessage());
}
- }catch (OmsException oe) {
+ }catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
- throw new OmsException("no server available!");
+ throw new PowerJobException("no server available!");
}
@Override
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java
index 1b1daf6b..227d5bec 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OmsBannerPrinter.java
@@ -11,21 +11,28 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class OmsBannerPrinter {
- private static final String BANNER = "\n ███████ ██ ██ \n" +
- "░██░░░░██ ░██ ░██ \n" +
- "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██ \n" +
- "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████ \n" +
+ private static final String BANNER = "" +
+ "\n" +
+ " ███████ ██ ██\n" +
+ "░██░░░░██ ░██ ░██\n" +
+ "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██\n" +
+ "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████\n" +
"░██░░░░ ░██ ░██ ░██ ███░██░███████ ░██ ░ ░██░██ ░██░██░░░██\n" +
"░██ ░██ ░██ ░████░████░██░░░░ ░██ ██ ░██░██ ░██░██ ░██\n" +
- "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ \n" +
- "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ \n";
+ "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████\n" +
+ "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░\n" +
+ "\n" +
+ "* Maintainer: tengjiqi@gmail.com & PowerJob-Team\n" +
+ "* OfficialWebsite: http://www.powerjob.tech/\n" +
+ "* SourceCode: https://github.com/KFCFans/PowerJob\n" +
+ "\n";
public static void print() {
log.info(BANNER);
String version = OmsWorkerVersion.getVersion();
version = (version != null) ? " (v" + version + ")" : "";
- log.info(":: OhMyScheduler Worker :: {}", version);
+ log.info(":: PowerJob Worker :: {}", version);
}
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java
index 6dfacce6..8efe4adf 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.worker.container;
import com.github.kfcfans.powerjob.common.ContainerConstant;
-import com.github.kfcfans.powerjob.common.OmsException;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
@@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer {
if (propertiesURLStream == null) {
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
- throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
+ throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
}
properties.load(propertiesURLStream);
@@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer {
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
if (StringUtils.isEmpty(packageName)) {
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
- throw new OmsException("invalid jar file");
+ throw new PowerJobException("invalid jar file");
}
// 加载用户类
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
index e9957127..38d4937f 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
@@ -318,12 +318,12 @@ public class ProcessorTracker {
break;
default:
log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType);
- throw new OmsException("unknown processor type of " + processorType);
+ throw new PowerJobException("unknown processor type of " + processorType);
}
if (processor == null) {
log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
- throw new OmsException("fetch Processor failed");
+ throw new PowerJobException("fetch Processor failed");
}
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java
index 01e1f9ac..e3f6a08e 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java
@@ -110,7 +110,7 @@ public class CommonTaskTracker extends TaskTracker {
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
}else {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
- throw new OmsException("create root task failed for instance: " + instanceId);
+ throw new PowerJobException("create root task failed for instance: " + instanceId);
}
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java
index 00cd28cc..f56e9a62 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java
@@ -91,7 +91,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
// 固定频率需要设置最小间隔
if (timeParams < MIN_INTERVAL) {
- throw new OmsException("time interval too small, please set the timeExpressionInfo >= 1000");
+ throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000");
}
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
}else {
@@ -123,6 +123,9 @@ public class FrequentTaskTracker extends TaskTracker {
history.add(subDetail);
});
+ // 按 subInstanceId 排序 issue#63
+ history.sort((o1, o2) -> (int) (o2.getSubInstanceId() - o1.getSubInstanceId()));
+
detail.setSubInstanceDetails(history);
return detail;
}