Merge branch 'v3.3.0' into jenkins_auto_build

This commit is contained in:
tjq 2020-10-08 13:16:23 +08:00
commit 1b05ff7082
49 changed files with 535 additions and 231 deletions

View File

@ -47,7 +47,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** | | 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | | 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | | 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** | | 报警监控 | 无 | 邮件 | 短信 | **邮件与钉钉,并支持开发者扩展** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** | | 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | | DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |

View File

@ -1,15 +1,17 @@
/* /*
Navicat Premium Data Transfer Navicat Premium Data Transfer
Source Server : Local MySQL
Source Server Type : MySQL Source Server Type : MySQL
Source Server Version : 80020 Source Server Version : 80021
Source Schema : powerjob-product Source Host : localhost:3306
Source Schema : powerjob-daily
Target Server Type : MySQL Target Server Type : MySQL
Target Server Version : 80020 Target Server Version : 80021
File Encoding : 65001 File Encoding : 65001
Date: 23/06/2020 22:30:06 Date: 08/10/2020 12:39:10
*/ */
SET NAMES utf8mb4; SET NAMES utf8mb4;
@ -28,7 +30,7 @@ CREATE TABLE `app_info` (
`password` varchar(255) DEFAULT NULL, `password` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `appNameUK` (`app_name`) UNIQUE KEY `appNameUK` (`app_name`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ---------------------------- -- ----------------------------
-- Table structure for container_info -- Table structure for container_info
@ -62,10 +64,10 @@ CREATE TABLE `instance_info` (
`gmt_create` datetime(6) DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL,
`instance_id` bigint DEFAULT NULL, `instance_id` bigint DEFAULT NULL,
`instance_params` text, `instance_params` longtext,
`job_id` bigint DEFAULT NULL, `job_id` bigint DEFAULT NULL,
`last_report_time` bigint DEFAULT NULL, `last_report_time` bigint DEFAULT NULL,
`result` text, `result` longtext,
`running_times` bigint DEFAULT NULL, `running_times` bigint DEFAULT NULL,
`status` int DEFAULT NULL, `status` int DEFAULT NULL,
`task_tracker_address` varchar(255) DEFAULT NULL, `task_tracker_address` varchar(255) DEFAULT NULL,
@ -75,7 +77,7 @@ CREATE TABLE `instance_info` (
KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`), KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`),
KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`), KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`),
KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`) KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ---------------------------- -- ----------------------------
-- Table structure for job_info -- Table structure for job_info
@ -101,7 +103,7 @@ CREATE TABLE `job_info` (
`min_memory_space` double NOT NULL, `min_memory_space` double NOT NULL,
`next_trigger_time` bigint DEFAULT NULL, `next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL,
`processor_info` text, `processor_info` longtext,
`processor_type` int DEFAULT NULL, `processor_type` int DEFAULT NULL,
`status` int DEFAULT NULL, `status` int DEFAULT NULL,
`task_retry_num` int DEFAULT NULL, `task_retry_num` int DEFAULT NULL,
@ -109,7 +111,7 @@ CREATE TABLE `job_info` (
`time_expression_type` int DEFAULT NULL, `time_expression_type` int DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`) KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ---------------------------- -- ----------------------------
-- Table structure for oms_lock -- Table structure for oms_lock
@ -124,7 +126,7 @@ CREATE TABLE `oms_lock` (
`ownerip` varchar(255) DEFAULT NULL, `ownerip` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `lockNameUK` (`lock_name`) UNIQUE KEY `lockNameUK` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ---------------------------- -- ----------------------------
-- Table structure for server_info -- Table structure for server_info
@ -166,7 +168,7 @@ CREATE TABLE `workflow_info` (
`max_wf_instance_num` int DEFAULT NULL, `max_wf_instance_num` int DEFAULT NULL,
`next_trigger_time` bigint DEFAULT NULL, `next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL,
`pedag` text, `pedag` longtext,
`status` int DEFAULT NULL, `status` int DEFAULT NULL,
`time_expression` varchar(255) DEFAULT NULL, `time_expression` varchar(255) DEFAULT NULL,
`time_expression_type` int DEFAULT NULL, `time_expression_type` int DEFAULT NULL,
@ -174,7 +176,7 @@ CREATE TABLE `workflow_info` (
`wf_name` varchar(255) DEFAULT NULL, `wf_name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`) KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ---------------------------- -- ----------------------------
-- Table structure for workflow_instance_info -- Table structure for workflow_instance_info
@ -184,15 +186,16 @@ CREATE TABLE `workflow_instance_info` (
`id` bigint NOT NULL AUTO_INCREMENT, `id` bigint NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint DEFAULT NULL, `actual_trigger_time` bigint DEFAULT NULL,
`app_id` bigint DEFAULT NULL, `app_id` bigint DEFAULT NULL,
`dag` text, `dag` longtext,
`finished_time` bigint DEFAULT NULL, `finished_time` bigint DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL,
`result` text, `result` longtext,
`status` int DEFAULT NULL, `status` int DEFAULT NULL,
`wf_init_params` longtext,
`wf_instance_id` bigint DEFAULT NULL, `wf_instance_id` bigint DEFAULT NULL,
`workflow_id` bigint DEFAULT NULL, `workflow_id` bigint DEFAULT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId> <artifactId>powerjob-client</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.common.version>3.2.3</powerjob.common.version> <powerjob.common.version>3.3.0</powerjob.common.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
</properties> </properties>

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.client; package com.github.kfcfans.powerjob.client;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.OpenAPIConstant; import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
@ -68,7 +68,7 @@ public class OhMyClient {
currentAddress = addr; currentAddress = addr;
break; break;
}else { }else {
throw new OmsException(resultDTO.getMessage()); throw new PowerJobException(resultDTO.getMessage());
} }
} }
}catch (IOException ignore) { }catch (IOException ignore) {
@ -76,7 +76,7 @@ public class OhMyClient {
} }
if (StringUtils.isEmpty(currentAddress)) { if (StringUtils.isEmpty(currentAddress)) {
throw new OmsException("no server available"); throw new PowerJobException("no server available");
} }
log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress); log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
} }
@ -108,7 +108,7 @@ public class OhMyClient {
request.setAppId(appId); request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request); String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType)); String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class); return JsonUtils.parseObject(post, ResultDTO.class);
} }
@ -283,7 +283,7 @@ public class OhMyClient {
request.setAppId(appId); request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request); String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType)); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class); return JsonUtils.parseObject(post, ResultDTO.class);
} }
@ -349,17 +349,26 @@ public class OhMyClient {
/** /**
* 运行工作流 * 运行工作流
* @param workflowId workflowId * @param workflowId 工作流ID
* @param initParams 启动参数
* @param delayMS 延迟时间单位毫秒 ms
* @return 工作流实例ID * @return 工作流实例ID
* @throws Exception 异常 * @throws Exception 异常信息
*/ */
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception { public ResultDTO<Long> runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception {
FormBody.Builder builder = new FormBody.Builder() FormBody.Builder builder = new FormBody.Builder()
.add("workflowId", workflowId.toString()) .add("workflowId", workflowId.toString())
.add("appId", appId.toString()); .add("appId", appId.toString())
.add("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(initParams)) {
builder.add("initParams", initParams);
}
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
return JsonUtils.parseObject(post, ResultDTO.class); return JsonUtils.parseObject(post, ResultDTO.class);
} }
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception {
return runWorkflow(workflowId, null, 0);
}
/* ************* Workflow Instance 区 ************* */ /* ************* Workflow Instance 区 ************* */
/** /**
@ -426,6 +435,6 @@ public class OhMyClient {
} }
log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress); log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available when send post"); throw new PowerJobException("no server available when send post");
} }
} }

View File

@ -1,7 +1,11 @@
import com.github.kfcfans.powerjob.client.OhMyClient; import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -20,7 +24,23 @@ public class TestWorkflow {
@BeforeAll @BeforeAll
public static void initClient() throws Exception { public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null); ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test
public void initTestData() throws Exception {
SaveJobInfoRequest base = new SaveJobInfoRequest();
base.setJobName("DAG-Node-");
base.setTimeExpressionType(TimeExpressionType.WORKFLOW);
base.setExecuteType(ExecuteType.STANDALONE);
base.setProcessorType(ProcessorType.EMBEDDED_JAVA);
base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor");
for (int i = 0; i < 5; i++) {
SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class);
request.setJobName(request.getJobName() + i);
System.out.println(ohMyClient.saveJob(request));
}
} }
@Test @Test
@ -30,8 +50,8 @@ public class TestWorkflow {
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList(); List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList(); List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
nodes.add(new PEWorkflowDAG.Node(1L, "node-1")); nodes.add(new PEWorkflowDAG.Node(1L, "DAG-Node-1"));
nodes.add(new PEWorkflowDAG.Node(2L, "node-2")); nodes.add(new PEWorkflowDAG.Node(2L, "DAG-Node-2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L));
@ -81,4 +101,9 @@ public class TestWorkflow {
public void testFetchWfInstanceInfo() throws Exception { public void testFetchWfInstanceInfo() throws Exception {
System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L)); System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L));
} }
@Test
public void testRunWorkflowPlus() throws Exception {
System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000));
}
} }

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId> <artifactId>powerjob-common</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
@ -18,7 +18,7 @@
<commons.lang.version>3.10</commons.lang.version> <commons.lang.version>3.10</commons.lang.version>
<commons.io.version>2.6</commons.io.version> <commons.io.version>2.6</commons.io.version>
<guava.version>29.0-jre</guava.version> <guava.version>29.0-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version> <okhttp.version>3.14.9</okhttp.version>
<akka.version>2.6.4</akka.version> <akka.version>2.6.4</akka.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
</properties> </properties>

View File

@ -1,29 +0,0 @@
package com.github.kfcfans.powerjob.common;
/**
* OhMyScheduler 运行时异常
*
* @author tjq
* @since 2020/5/26
*/
public class OmsException extends RuntimeException {
public OmsException() {
}
public OmsException(String message) {
super(message);
}
public OmsException(String message, Throwable cause) {
super(message, cause);
}
public OmsException(Throwable cause) {
super(cause);
}
public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.powerjob.common;
/**
* PowerJob 运行时异常
*
* @author tjq
* @since 2020/5/26
*/
public class PowerJobException extends RuntimeException {
public PowerJobException() {
}
public PowerJobException(String message) {
super(message);
}
public PowerJobException(String message, Throwable cause) {
super(message, cause);
}
public PowerJobException(Throwable cause) {
super(cause);
}
public PowerJobException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -20,12 +20,16 @@ public class InstanceInfoDTO {
private Long appId; private Long appId;
// 任务实例ID // 任务实例ID
private Long instanceId; private Long instanceId;
// 工作流实例ID
private Long wfInstanceId;
// 任务实例参数 // 任务实例参数
private String instanceParams; private String instanceParams;
/** /**
* 任务状态 {@link InstanceStatus} * 任务状态 {@link InstanceStatus}
*/ */
private int status; private int status;
// 该任务实例的类型普通/工作流InstanceType
private Integer type;
// 执行结果 // 执行结果
private String result; private String result;
// 预计触发时间 // 预计触发时间

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.common.utils; package com.github.kfcfans.powerjob.common.utils;
import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateFormatUtils;
@ -121,11 +121,11 @@ public class CommonUtils {
public static <T> T requireNonNull(T obj, String msg) { public static <T> T requireNonNull(T obj, String msg) {
if (obj == null) { if (obj == null) {
throw new OmsException(msg); throw new PowerJobException(msg);
} }
if (obj instanceof String) { if (obj instanceof String) {
if (StringUtils.isEmpty((String) obj)) { if (StringUtils.isEmpty((String) obj)) {
throw new OmsException(msg); throw new PowerJobException(msg);
} }
} }
return obj; return obj;

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.common.utils;
import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
/** /**
@ -54,6 +54,6 @@ public class JsonUtils {
}catch (Exception e) { }catch (Exception e) {
ExceptionUtils.rethrow(e); ExceptionUtils.rethrow(e);
} }
throw new OmsException("impossible"); throw new PowerJobException("impossible");
} }
} }

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId> <artifactId>powerjob-server</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<swagger.version>2.9.2</swagger.version> <swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.3</powerjob.common.version> <powerjob.common.version>3.3.0</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 --> <!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version> <mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version> <ojdbc.version>19.7.0.0</ojdbc.version>
@ -57,6 +57,11 @@
<artifactId>ojdbc8</artifactId> <artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version> <version>${ojdbc.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.oracle.database.nls</groupId>
<artifactId>orai18n</artifactId>
<version>${ojdbc.version}</version>
</dependency>
<!-- sqlserver --> <!-- sqlserver -->
<dependency> <dependency>
<groupId>com.microsoft.sqlserver</groupId> <groupId>com.microsoft.sqlserver</groupId>
@ -166,6 +171,12 @@
<groupId>com.aliyun</groupId> <groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId> <artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>${dingding.version}</version> <version>${dingding.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>

View File

@ -7,7 +7,7 @@ import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request;
import com.dingtalk.api.request.OapiUserGetByMobileRequest; import com.dingtalk.api.request.OapiUserGetByMobileRequest;
import com.dingtalk.api.response.OapiGettokenResponse; import com.dingtalk.api.response.OapiGettokenResponse;
import com.dingtalk.api.response.OapiUserGetByMobileResponse; import com.dingtalk.api.response.OapiUserGetByMobileResponse;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -52,7 +52,7 @@ public class DingTalkUtils implements Closeable {
refreshAccessToken(appKey, appSecret); refreshAccessToken(appKey, appSecret);
if (StringUtils.isEmpty(accessToken)) { if (StringUtils.isEmpty(accessToken)) {
throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret"); throw new PowerJobException("fetch AccessToken failed, please check your appKey & appSecret");
} }
scheduledPool = Executors.newSingleThreadScheduledExecutor(); scheduledPool = Executors.newSingleThreadScheduledExecutor();
@ -91,7 +91,7 @@ public class DingTalkUtils implements Closeable {
return execute.getUserid(); return execute.getUserid();
} }
log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg()); log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg());
throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); throw new PowerJobException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
} }
public void sendMarkdownAsync(String title, List<MarkdownEntity> entities, String userList, Long agentId) throws Exception { public void sendMarkdownAsync(String title, List<MarkdownEntity> entities, String userList, Long agentId) throws Exception {

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils; package com.github.kfcfans.powerjob.server.common.utils;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.model.WorkflowDAG; import com.github.kfcfans.powerjob.server.model.WorkflowDAG;
@ -76,7 +76,7 @@ public class WorkflowDAGUtils {
Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap(); Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap();
if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) { if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) {
throw new OmsException("empty graph"); throw new PowerJobException("empty graph");
} }
// 创建节点 // 创建节点
@ -95,7 +95,7 @@ public class WorkflowDAGUtils {
WorkflowDAG.Node to = id2Node.get(edge.getTo()); WorkflowDAG.Node to = id2Node.get(edge.getTo());
if (from == null || to == null) { if (from == null || to == null) {
throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge)); throw new PowerJobException("Illegal Edge: " + JsonUtils.toJSONString(edge));
} }
from.getSuccessors().add(to); from.getSuccessors().add(to);
@ -106,7 +106,7 @@ public class WorkflowDAGUtils {
// 合法性校验至少存在一个顶点 // 合法性校验至少存在一个顶点
if (rootIds.size() < 1) { if (rootIds.size() < 1) {
throw new OmsException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG)); throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG));
} }
List<WorkflowDAG.Node> roots = Lists.newLinkedList(); List<WorkflowDAG.Node> roots = Lists.newLinkedList();

View File

@ -36,6 +36,11 @@ public class WorkflowInstanceInfoDO {
// workflow 状态WorkflowInstanceStatus // workflow 状态WorkflowInstanceStatus
private Integer status; private Integer status;
// 工作流启动参数
@Lob
@Column
private String wfInitParams;
@Lob @Lob
@Column @Column
private String dag; private String dag;

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.service; package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -28,11 +28,11 @@ public class AppInfoService {
*/ */
public Long assertApp(String appName, String password) { public Long assertApp(String appName, String password) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName)); AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) { if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId(); return appInfo.getId();
} }
throw new OmsException("password error!"); throw new PowerJobException("password error!");
} }
} }

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service; package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
@ -70,7 +71,7 @@ public class JobService {
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) { if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxInstanceNum(0); jobInfoDO.setMaxWorkerCount(0);
} }
// 转化报警用户列表 // 转化报警用户列表
@ -78,7 +79,7 @@ public class JobService {
jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds())); jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds()));
} }
refreshJob(jobInfoDO); calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) { if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date()); jobInfoDO.setGmtCreate(new Date());
} }
@ -143,7 +144,7 @@ public class JobService {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
refreshJob(jobInfoDO); calculateNextTriggerTime(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO);
} }
@ -184,7 +185,7 @@ public class JobService {
}); });
} }
private void refreshJob(JobInfoDO jobInfoDO) throws Exception { private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception {
// 计算下次调度时间 // 计算下次调度时间
Date now = new Date(); Date now = new Date();
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
@ -192,6 +193,9 @@ public class JobService {
if (timeExpressionType == TimeExpressionType.CRON) { if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now); Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
if (nextValidTime == null) {
throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression());
}
jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) {
jobInfoDO.setTimeExpression(null); jobInfoDO.setTimeExpression(null);

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.service.alarm.impl; package com.github.kfcfans.powerjob.server.service.alarm.impl;
import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.SJ;
@ -55,7 +55,7 @@ public class DingTalkAlarmService implements Alarmable {
String userId = mobile2UserIdCache.get(user.getPhone(), () -> { String userId = mobile2UserIdCache.get(user.getPhone(), () -> {
try { try {
return dingTalkUtils.fetchUserIdByMobile(user.getPhone()); return dingTalkUtils.fetchUserIdByMobile(user.getPhone());
} catch (OmsException ignore) { } catch (PowerJobException ignore) {
return EMPTY_TAG; return EMPTY_TAG;
} catch (Exception ignore) { } catch (Exception ignore) {
return null; return null;

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.ha;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.Ping; import com.github.kfcfans.powerjob.server.akka.requests.Ping;
@ -56,7 +56,7 @@ public class ServerSelectService {
// 无锁获取当前数据库中的Server // 无锁获取当前数据库中的Server
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId); Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) { if (!appInfoOpt.isPresent()) {
throw new OmsException(appId + " is not registered!"); throw new PowerJobException(appId + " is not registered!");
} }
String appName = appInfoOpt.get().getAppName(); String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer(); String originServer = appInfoOpt.get().getCurrentServer();

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.model.InstanceDetail;
@ -135,11 +135,11 @@ public class InstanceService {
public void retryInstance(Long instanceId) { public void retryInstance(Long instanceId) {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) { if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) {
throw new OmsException("Only stopped instance can be retry!"); throw new PowerJobException("Only stopped instance can be retry!");
} }
// 暂时不支持工作流任务的重试 // 暂时不支持工作流任务的重试
if (instanceInfo.getWfInstanceId() != null) { if (instanceInfo.getWfInstanceId() != null) {
throw new OmsException("Workflow's instance do not support retry!"); throw new PowerJobException("Workflow's instance do not support retry!");
} }
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
@ -152,7 +152,7 @@ public class InstanceService {
// 派发任务 // 派发任务
Long jobId = instanceInfo.getJobId(); Long jobId = instanceInfo.getJobId();
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new OmsException("can't find job info by jobId: " + jobId)); JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
} }
@ -187,7 +187,7 @@ public class InstanceService {
log.info("[Instance-{}] cancel the instance successfully.", instanceId); log.info("[Instance-{}] cancel the instance successfully.", instanceId);
}else { }else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId); log.warn("[Instance-{}] cancel the instance failed.", instanceId);
throw new OmsException("instance already up and running"); throw new PowerJobException("instance already up and running");
} }
}catch (Exception e) { }catch (Exception e) {

View File

@ -171,7 +171,7 @@ public class InstanceStatusCheckService {
waitingWfInstanceList.forEach(wfInstance -> { waitingWfInstanceList.forEach(wfInstance -> {
Optional<WorkflowInfoDO> workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId()); Optional<WorkflowInfoDO> workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId());
workflowOpt.ifPresent(workflowInfo -> { workflowOpt.ifPresent(workflowInfo -> {
workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId()); workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams());
log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId()); log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId());
}); });
}); });

View File

@ -121,8 +121,6 @@ public class OmsScheduleService {
*/ */
private void scheduleCronJob(List<Long> appIds) { private void scheduleCronJob(List<Long> appIds) {
Date now = new Date();
long nowTime = System.currentTimeMillis(); long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE; long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
@ -165,24 +163,13 @@ public class OmsScheduleService {
}); });
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms // 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
List<JobInfoDO> updatedJobInfos = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> { jobInfos.forEach(jobInfoDO -> {
try { try {
refreshJob(jobInfoDO);
Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
updatedJobInfo.setGmtModified(now);
updatedJobInfos.add(updatedJobInfo);
} catch (Exception e) { } catch (Exception e) {
log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e); log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
} }
}); });
jobInfoRepository.saveAll(updatedJobInfos);
jobInfoRepository.flush(); jobInfoRepository.flush();
@ -203,11 +190,10 @@ public class OmsScheduleService {
return; return;
} }
Date now = new Date();
wfInfos.forEach(wfInfo -> { wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录防止不调度的情况发生 // 1. 先生成调度记录防止不调度的情况发生
Long wfInstanceId = workflowInstanceManager.create(wfInfo); Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
// 2. 推入时间轮准备调度执行 // 2. 推入时间轮准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis(); long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();
@ -215,20 +201,13 @@ public class OmsScheduleService {
log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis()); log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis());
delay = 0; delay = 0;
} }
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId)); InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null));
// 3. 重新计算下一次调度时间并更新 // 3. 重新计算下一次调度时间并更新
try { try {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); refreshWorkflow(wfInfo);
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
updateEntity.setGmtModified(now);
workflowInfoRepository.save(updateEntity);
}catch (Exception e) { }catch (Exception e) {
log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e); log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e);
} }
}); });
workflowInfoRepository.flush(); workflowInfoRepository.flush();
@ -264,6 +243,40 @@ public class OmsScheduleService {
}); });
} }
private void refreshJob(JobInfoDO jobInfo) throws Exception {
Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfo, updatedJobInfo);
if (nextTriggerTime == null) {
log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId());
updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV());
}else {
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
}
updatedJobInfo.setGmtModified(new Date());
jobInfoRepository.save(updatedJobInfo);
}
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
if (nextTriggerTime == null) {
log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId());
wfInfo.setStatus(SwitchableStatus.DISABLE.getV());
}else {
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
}
updateEntity.setGmtModified(new Date());
workflowInfoRepository.save(updateEntity);
}
/** /**
* 计算下次触发时间 * 计算下次触发时间
* @param preTriggerTime 前一次触发时间 * @param preTriggerTime 前一次触发时间

View File

@ -67,9 +67,10 @@ public class WorkflowInstanceManager {
/** /**
* 创建工作流任务实例 * 创建工作流任务实例
* @param wfInfo 工作流任务元数据描述信息 * @param wfInfo 工作流任务元数据描述信息
* @param initParams 启动参数
* @return wfInstanceId * @return wfInstanceId
*/ */
public Long create(WorkflowInfoDO wfInfo) { public Long create(WorkflowInfoDO wfInfo, String initParams) {
Long wfId = wfInfo.getId(); Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate(); Long wfInstanceId = idGenerateService.allocate();
@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
newWfInstance.setWorkflowId(wfId); newWfInstance.setWorkflowId(wfId);
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
newWfInstance.setActualTriggerTime(System.currentTimeMillis()); newWfInstance.setActualTriggerTime(System.currentTimeMillis());
newWfInstance.setWfInitParams(initParams);
newWfInstance.setGmtCreate(now); newWfInstance.setGmtCreate(now);
newWfInstance.setGmtModified(now); newWfInstance.setGmtModified(now);
@ -107,8 +109,9 @@ public class WorkflowInstanceManager {
* 开始任务 * 开始任务
* @param wfInfo 工作流任务信息 * @param wfInfo 工作流任务信息
* @param wfInstanceId 工作流任务实例ID * @param wfInstanceId 工作流任务实例ID
* @param initParams 启动参数
*/ */
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) { public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) {
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
if (!wfInstanceInfoOpt.isPresent()) { if (!wfInstanceInfoOpt.isPresent()) {
@ -132,13 +135,19 @@ public class WorkflowInstanceManager {
try { try {
// 构建根任务启动参数为了精简 worker 端实现启动参数仍以 instanceParams 字段承接
Map<String, String> preJobId2Result = Maps.newHashMap();
// 模拟 preJobId -> preJobResult 的格式-1 代表前置任务不存在
preJobId2Result.put("-1", initParams);
String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result);
PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG);
peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()));
// 创建所有的根任务 // 创建所有的根任务
roots.forEach(root -> { roots.forEach(root -> {
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId); root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV()); root.setStatus(InstanceStatus.RUNNING.getV());
@ -152,7 +161,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务 // 真正开始执行根任务
roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null)); roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams));
}catch (Exception e) { }catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
@ -43,7 +43,7 @@ public class WorkflowInstanceService {
public void stopWorkflowInstance(Long wfInstanceId, Long appId) { public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
throw new OmsException("workflow instance already stopped"); throw new PowerJobException("workflow instance already stopped");
} }
// 停止所有已启动且未完成的服务 // 停止所有已启动且未完成的服务
PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
@ -80,7 +80,7 @@ public class WorkflowInstanceService {
public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) { public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId));
if (!Objects.equals(appId, wfInstance.getAppId())) { if (!Objects.equals(appId, wfInstance.getAppId())) {
throw new OmsException("Permission Denied!"); throw new PowerJobException("Permission Denied!");
} }
return wfInstance; return wfInstance;
} }

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.service.workflow; package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO; import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO;
@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -42,7 +43,7 @@ public class WorkflowService {
req.valid(); req.valid();
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) { if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
throw new OmsException("illegal DAG"); throw new PowerJobException("illegal DAG");
} }
Long wfId = req.getId(); Long wfId = req.getId();
@ -130,22 +131,27 @@ public class WorkflowService {
* 立即运行工作流 * 立即运行工作流
* @param wfId 工作流ID * @param wfId 工作流ID
* @param appId 所属应用ID * @param appId 所属应用ID
* @param initParams 启动参数
* @param delay 延迟时间
* @return workflow 实例的 instanceIdwfInstanceId * @return workflow 实例的 instanceIdwfInstanceId
*/ */
public Long runWorkflow(Long wfId, Long appId) { public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
Long wfInstanceId = workflowInstanceManager.create(wfInfo); Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
// 正式启动任务 if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId); workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams));
}
return wfInstanceId; return wfInstanceId;
} }
private WorkflowInfoDO permissionCheck(Long wfId, Long appId) { private WorkflowInfoDO permissionCheck(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId)); WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId));
if (!wfInfo.getAppId().equals(appId)) { if (!wfInfo.getAppId().equals(appId)) {
throw new OmsException("Permission Denied!can't delete other appId's workflow!"); throw new PowerJobException("Permission Denied!can't delete other appId's workflow!");
} }
return wfInfo; return wfInfo;
} }

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.web; package com.github.kfcfans.powerjob.server.web;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.http.converter.HttpMessageNotReadableException;
@ -25,7 +25,7 @@ public class ControllerExceptionHandler {
public ResultDTO<Void> exceptionHandler(Exception e) { public ResultDTO<Void> exceptionHandler(Exception e) {
// 不是所有异常都需要打印完整堆栈后续可以定义内部的Exception便于判断 // 不是所有异常都需要打印完整堆栈后续可以定义内部的Exception便于判断
if (e instanceof IllegalArgumentException || e instanceof OmsException) { if (e instanceof IllegalArgumentException || e instanceof PowerJobException) {
log.warn("[ControllerException] http request failed, message is {}.", e.getMessage()); log.warn("[ControllerException] http request failed, message is {}.", e.getMessage());
} else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) { } else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) {
log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage()); log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage());

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.web.controller; package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
@ -145,10 +146,10 @@ public class InstanceController {
private String getTargetServer(Long instanceId) { private String getTargetServer(Long instanceId) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) { if (instanceInfo == null) {
throw new RuntimeException("invalid instanceId: " + instanceId); throw new PowerJobException("invalid instanceId: " + instanceId);
} }
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId()); Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId());
return appInfoOpt.orElseThrow(() -> new RuntimeException("impossible")).getCurrentServer(); return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer();
} }
} }

View File

@ -150,8 +150,8 @@ public class OpenAPIController {
} }
@PostMapping(OpenAPIConstant.RUN_WORKFLOW) @PostMapping(OpenAPIConstant.RUN_WORKFLOW)
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) { public ResultDTO<Long> runWorkflow(Long workflowId, Long appId, @RequestParam(required = false) String initParams, @RequestParam(required = false) Long delay) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay == null ? 0 : delay));
} }
/* ************* Workflow Instance 区 ************* */ /* ************* Workflow Instance 区 ************* */

View File

@ -79,7 +79,7 @@ public class WorkflowController {
@GetMapping("/run") @GetMapping("/run")
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) { public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0));
} }
private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) { private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) {

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.web.request; package com.github.kfcfans.powerjob.server.web.request;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data; import lombok.Data;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -21,7 +21,7 @@ public class ModifyAppInfoRequest {
public void valid() { public void valid() {
CommonUtils.requireNonNull(appName, "appName can't be empty"); CommonUtils.requireNonNull(appName, "appName can't be empty");
if (StringUtils.containsWhitespace(appName)) { if (StringUtils.containsWhitespace(appName)) {
throw new OmsException("appName can't contains white space!"); throw new PowerJobException("appName can't contains white space!");
} }
} }
} }

View File

@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 ####### ####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) ####### ####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com spring.mail.host=smtp.163.com

View File

@ -8,6 +8,7 @@ ${AnsiColor.GREEN}
░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ ░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████
░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░
${AnsiColor.BRIGHT_RED} ${AnsiColor.BRIGHT_RED}
* Maintainer: tengjiqi@gmail.com * Maintainer: tengjiqi@gmail.com & PowerJob-Team
* OfficialWebsite: http://www.powerjob.tech/
* SourceCode: https://github.com/KFCFans/PowerJob * SourceCode: https://github.com/KFCFans/PowerJob
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)

View File

@ -10,7 +10,7 @@
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/> converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 彩色日志格式 --> <!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN" <property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{20}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!-- Console 输出设置 --> <!-- Console 输出设置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">

View File

@ -8,6 +8,13 @@
--> -->
<property name="LOG_PATH" value="${user.home}/powerjob-server/logs"/> <property name="LOG_PATH" value="${user.home}/powerjob-server/logs"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 系统所有异常日志ERROR双写 start --> <!-- 系统所有异常日志ERROR双写 start -->
<appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"> <appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/powerjob-server-error.log</file> <file>${LOG_PATH}/powerjob-server-error.log</file>
@ -16,7 +23,7 @@
<MaxHistory>7</MaxHistory> <MaxHistory>7</MaxHistory>
</rollingPolicy> </rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n</pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter"> <filter class="ch.qos.logback.classic.filter.LevelFilter">
@ -53,7 +60,7 @@
<MaxHistory>7</MaxHistory> <MaxHistory>7</MaxHistory>
</rollingPolicy> </rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n</pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>
<append>true</append> <append>true</append>
@ -61,6 +68,7 @@
<!-- 系统主日志 日志 end --> <!-- 系统主日志 日志 end -->
<root level="INFO"> <root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEFAULT_APPENDER"/> <appender-ref ref="DEFAULT_APPENDER"/>
<appender-ref ref="ERROR_APPENDER"/> <appender-ref ref="ERROR_APPENDER"/>
</root> </root>

View File

@ -0,0 +1,26 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import org.junit.Test;
import java.util.Date;
/**
* CRON 测试
*
* @author tjq
* @since 2020/10/8
*/
public class CronTest {
private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020";
@Test
public void testFixedTimeCron() throws Exception {
CronExpression cronExpression = new CronExpression(FIXED_CRON);
System.out.println(cronExpression.getCronExpression());
System.out.println(cronExpression.getNextValidTimeAfter(new Date()));
}
}

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId> <artifactId>powerjob-worker-agent</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.worker.version>3.2.3</powerjob.worker.version> <powerjob.worker.version>3.3.0</powerjob.worker.version>
<logback.version>1.2.3</logback.version> <logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version> <picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId> <artifactId>powerjob-worker-samples</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<properties> <properties>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.2.3</powerjob.worker.starter.version> <powerjob.worker.starter.version>3.3.0</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module --> <!-- 部署时跳过该module -->

View File

@ -2,12 +2,14 @@ server.port=8081
spring.jpa.open-in-view=false spring.jpa.open-in-view=false
########### powerjob-worker 配置 ########### ########### powerjob-worker 配置(老配置 powerjob.xxx 即将废弃,请使用 powerjob.worker.xxx ###########
# akka 工作端口,可选,默认 27777 # akka 工作端口,可选,默认 27777
powerjob.akka-port=27777 powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称 # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.app-name=powerjob-agent-test powerjob.worker.app-name=powerjob-agent-test
# 调度服务器地址IP:Port 或 域名,多值逗号分隔 # 调度服务器地址IP:Port 或 域名,多值逗号分隔
powerjob.server-address=127.0.0.1:7700,127.0.0.1:7701 powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 持久化方式,可选,默认 disk # 持久化方式,可选,默认 disk
powerjob.store-strategy=disk powerjob.worker.store-strategy=disk
# 返回值最大长度,默认 8096
powerjob.worker.max-result-length=4096

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId> <artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.worker.version>3.2.3</powerjob.worker.version> <powerjob.worker.version>3.3.0</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
</properties> </properties>

View File

@ -3,9 +3,12 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.Arrays; import java.util.Arrays;
@ -19,32 +22,53 @@ import java.util.List;
*/ */
@Configuration @Configuration
@EnableConfigurationProperties(PowerJobProperties.class) @EnableConfigurationProperties(PowerJobProperties.class)
@Conditional(PowerJobAutoConfiguration.PowerJobWorkerCondition.class)
public class PowerJobAutoConfiguration { public class PowerJobAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public OhMyWorker initPowerJob(PowerJobProperties properties) { public OhMyWorker initPowerJob(PowerJobProperties properties) {
PowerJobProperties.Worker worker = properties.getWorker();
// 服务器HTTP地址端口号为 server.port而不是 ActorSystem port请勿添加任何前缀http:// // 服务器HTTP地址端口号为 server.port而不是 ActorSystem port请勿添加任何前缀http://
CommonUtils.requireNonNull(properties.getServerAddress(), "serverAddress can't be empty!"); CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty!");
List<String> serverAddress = Arrays.asList(properties.getServerAddress().split(",")); List<String> serverAddress = Arrays.asList(worker.getServerAddress().split(","));
// 1. 创建配置文件 // 1. 创建配置文件
OhMyConfig config = new OhMyConfig(); OhMyConfig config = new OhMyConfig();
// 可以不显式设置默认值 27777 // 可以不显式设置默认值 27777
config.setPort(properties.getAkkaPort()); config.setPort(worker.getAkkaPort());
// appName需要提前在控制台注册否则启动报错 // appName需要提前在控制台注册否则启动报错
config.setAppName(properties.getAppName()); config.setAppName(worker.getAppName());
config.setServerAddress(serverAddress); config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算 // 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 有大型 Map/MapReduce 需求可能产生大量子任务Task的场景请使用 DISK否则妥妥的 OutOfMemory // 有大型 Map/MapReduce 需求可能产生大量子任务Task的场景请使用 DISK否则妥妥的 OutOfMemory
config.setStoreStrategy(properties.getStoreStrategy()); config.setStoreStrategy(worker.getStoreStrategy());
// 启动测试模式true情况下不再尝试连接 server 并验证appName // 启动测试模式true情况下不再尝试连接 server 并验证appName
config.setEnableTestMode(properties.isEnableTestMode()); config.setEnableTestMode(worker.isEnableTestMode());
// 2. 创建 Worker 对象设置配置文件 // 2. 创建 Worker 对象设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker(); OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config); ohMyWorker.setConfig(config);
return ohMyWorker; return ohMyWorker;
} }
static class PowerJobWorkerCondition extends AnyNestedCondition {
public PowerJobWorkerCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@Deprecated
@ConditionalOnProperty(prefix = "powerjob", name = "server-address")
static class PowerJobProperty {
}
@ConditionalOnProperty(prefix = "powerjob.worker", name = "server-address")
static class PowerJobWorkerProperty {
}
}
} }

View File

@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import lombok.Data; import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
/** /**
* PowerJob 配置项 * PowerJob 配置项
@ -12,9 +14,89 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
* @author songyinyin * @author songyinyin
* @since 2020/7/26 16:37 * @since 2020/7/26 16:37
*/ */
@Data
@ConfigurationProperties(prefix = "powerjob") @ConfigurationProperties(prefix = "powerjob")
public class PowerJobProperties { public class PowerJobProperties {
private final Worker worker = new Worker();
public Worker getWorker() {
return worker;
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.app-name")
public String getAppName() {
return getWorker().appName;
}
@Deprecated
public void setAppName(String appName) {
getWorker().setAppName(appName);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.akka-port")
public int getAkkaPort() {
return getWorker().akkaPort;
}
@Deprecated
public void setAkkaPort(int akkaPort) {
getWorker().setAkkaPort(akkaPort);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.server-address")
public String getServerAddress() {
return getWorker().serverAddress;
}
@Deprecated
public void setServerAddress(String serverAddress) {
getWorker().setServerAddress(serverAddress);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.store-strategy")
public StoreStrategy getStoreStrategy() {
return getWorker().storeStrategy;
}
@Deprecated
public void setStoreStrategy(StoreStrategy storeStrategy) {
getWorker().setStoreStrategy(storeStrategy);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.max-result-length")
public int getMaxResultLength() {
return getWorker().maxResultLength;
}
@Deprecated
public void setMaxResultLength(int maxResultLength) {
getWorker().setMaxResultLength(maxResultLength);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.enable-test-mode")
public boolean isEnableTestMode() {
return getWorker().enableTestMode;
}
@Deprecated
public void setEnableTestMode(boolean enableTestMode) {
getWorker().setEnableTestMode(enableTestMode);
}
/**
* 客户端 配置项
*/
@Setter
@Getter
public static class Worker {
/** /**
* 应用名称需要提前在控制台注册否则启动报错 * 应用名称需要提前在控制台注册否则启动报错
*/ */
@ -42,3 +124,4 @@ public class PowerJobProperties {
*/ */
private boolean enableTestMode = false; private boolean enableTestMode = false;
} }
}

View File

@ -4,46 +4,106 @@
"name": "powerjob", "name": "powerjob",
"type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", "type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.worker",
"type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceMethod": "getWorker()"
} }
], ],
"properties": [ "properties": [
{ {
"name": "powerjob.app-name", "name": "powerjob.worker.akka-port",
"type": "java.lang.String", "type": "java.lang.Integer",
"description": "应用名称,需要提前在控制台注册,否则启动报错", "description": "启动 akka 端口",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
}, },
{ {
"name": "powerjob.max-result-length", "name": "powerjob.worker.app-name",
"type": "java.lang.String",
"description": "应用名称,需要提前在控制台注册,否则启动报错",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.enable-test-mode",
"type": "java.lang.Boolean",
"description": "启动测试模式true情况下不再尝试连接 server 并验证appName。 true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"defaultValue": false
},
{
"name": "powerjob.worker.max-result-length",
"type": "java.lang.Integer", "type": "java.lang.Integer",
"description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度", "description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"defaultValue": 8096 "defaultValue": 8096
}, },
{
"name": "powerjob.worker.server-address",
"type": "java.lang.String",
"description": "调度服务器地址ip:port 或 域名,多个用英文逗号分隔",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "本地持久化方式,默认使用磁盘",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{ {
"name": "powerjob.akka-port", "name": "powerjob.akka-port",
"type": "java.lang.Integer", "type": "java.lang.Integer",
"description": "启动 akka 端口", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" "deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.akka-port"
}
}, },
{ {
"name": "powerjob.server-address", "name": "powerjob.app-name",
"type": "java.lang.String", "type": "java.lang.String",
"description": "调度服务器地址ip:port 或 域名,多值用英文逗号分隔", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" "deprecated": true,
}, "deprecation": {
{ "replacement": "powerjob.worker.app-name"
"name": "powerjob.store-strategy", }
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "本地持久化方式,默认使用磁盘",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
}, },
{ {
"name": "powerjob.enable-test-mode", "name": "powerjob.enable-test-mode",
"type": "java.lang.Boolean", "type": "java.lang.Boolean",
"description": "启动测试模式true情况下不再尝试连接 server 并验证appName。true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"defaultValue": false "deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.enable-test-mode"
}
},
{
"name": "powerjob.max-result-length",
"type": "java.lang.Integer",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.max-result-length"
}
},
{
"name": "powerjob.server-address",
"type": "java.lang.String",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.server-address"
}
},
{
"name": "powerjob.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.store-strategy"
}
} }
], ],
"hints": [] "hints": []

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId> <artifactId>powerjob-worker</artifactId>
<version>3.2.3</version> <version>3.3.0</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<spring.version>5.2.4.RELEASE</spring.version> <spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.3</powerjob.common.version> <powerjob.common.version>3.3.0</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version> <hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>

View File

@ -5,7 +5,7 @@ import akka.actor.ActorSystem;
import akka.actor.DeadLetter; import akka.actor.DeadLetter;
import akka.actor.Props; import akka.actor.Props;
import akka.routing.RoundRobinPool; import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
@ -163,16 +163,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
return appId; return appId;
}else { }else {
log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new OmsException(resultDTO.getMessage()); throw new PowerJobException(resultDTO.getMessage());
} }
}catch (OmsException oe) { }catch (PowerJobException oe) {
throw oe; throw oe;
}catch (Exception ignore) { }catch (Exception ignore) {
log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl); log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl);
} }
} }
log.error("[OhMyWorker] no available server in {}.", config.getServerAddress()); log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
throw new OmsException("no server available!"); throw new PowerJobException("no server available!");
} }
@Override @Override

View File

@ -11,21 +11,28 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public final class OmsBannerPrinter { public final class OmsBannerPrinter {
private static final String BANNER = "\n ███████ ██ ██ \n" + private static final String BANNER = "" +
"\n" +
" ███████ ██ ██\n" +
"░██░░░░██ ░██ ░██\n" + "░██░░░░██ ░██ ░██\n" +
"░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██\n" + "░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██\n" +
"░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████\n" + "░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████\n" +
"░██░░░░ ░██ ░██ ░██ ███░██░███████ ░██ ░ ░██░██ ░██░██░░░██\n" + "░██░░░░ ░██ ░██ ░██ ███░██░███████ ░██ ░ ░██░██ ░██░██░░░██\n" +
"░██ ░██ ░██ ░████░████░██░░░░ ░██ ██ ░██░██ ░██░██ ░██\n" + "░██ ░██ ░██ ░████░████░██░░░░ ░██ ██ ░██░██ ░██░██ ░██\n" +
"░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████\n" + "░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████\n" +
"░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ \n"; "░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░\n" +
"\n" +
"* Maintainer: tengjiqi@gmail.com & PowerJob-Team\n" +
"* OfficialWebsite: http://www.powerjob.tech/\n" +
"* SourceCode: https://github.com/KFCFans/PowerJob\n" +
"\n";
public static void print() { public static void print() {
log.info(BANNER); log.info(BANNER);
String version = OmsWorkerVersion.getVersion(); String version = OmsWorkerVersion.getVersion();
version = (version != null) ? " (v" + version + ")" : ""; version = (version != null) ? " (v" + version + ")" : "";
log.info(":: OhMyScheduler Worker :: {}", version); log.info(":: PowerJob Worker :: {}", version);
} }
} }

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.worker.container; package com.github.kfcfans.powerjob.worker.container;
import com.github.kfcfans.powerjob.common.ContainerConstant; import com.github.kfcfans.powerjob.common.ContainerConstant;
import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer {
if (propertiesURLStream == null) { if (propertiesURLStream == null) {
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
} }
properties.load(propertiesURLStream); properties.load(propertiesURLStream);
@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer {
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY); String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
if (StringUtils.isEmpty(packageName)) { if (StringUtils.isEmpty(packageName)) {
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId); log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
throw new OmsException("invalid jar file"); throw new PowerJobException("invalid jar file");
} }
// 加载用户类 // 加载用户类

View File

@ -318,12 +318,12 @@ public class ProcessorTracker {
break; break;
default: default:
log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType); log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType);
throw new OmsException("unknown processor type of " + processorType); throw new PowerJobException("unknown processor type of " + processorType);
} }
if (processor == null) { if (processor == null) {
log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
throw new OmsException("fetch Processor failed"); throw new PowerJobException("fetch Processor failed");
} }
} }

View File

@ -110,7 +110,7 @@ public class CommonTaskTracker extends TaskTracker {
log.info("[TaskTracker-{}] create root task successfully.", instanceId); log.info("[TaskTracker-{}] create root task successfully.", instanceId);
}else { }else {
log.error("[TaskTracker-{}] create root task failed.", instanceId); log.error("[TaskTracker-{}] create root task failed.", instanceId);
throw new OmsException("create root task failed for instance: " + instanceId); throw new PowerJobException("create root task failed for instance: " + instanceId);
} }
} }

View File

@ -91,7 +91,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (timeExpressionType == TimeExpressionType.FIX_RATE) { if (timeExpressionType == TimeExpressionType.FIX_RATE) {
// 固定频率需要设置最小间隔 // 固定频率需要设置最小间隔
if (timeParams < MIN_INTERVAL) { if (timeParams < MIN_INTERVAL) {
throw new OmsException("time interval too small, please set the timeExpressionInfo >= 1000"); throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000");
} }
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS); scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
}else { }else {
@ -123,6 +123,9 @@ public class FrequentTaskTracker extends TaskTracker {
history.add(subDetail); history.add(subDetail);
}); });
// subInstanceId 排序 issue#63
history.sort((o1, o2) -> (int) (o2.getSubInstanceId() - o1.getSubInstanceId()));
detail.setSubInstanceDetails(history); detail.setSubInstanceDetails(history);
return detail; return detail;
} }