Merge branch 'v3.3.2' into jenkins_auto_build

This commit is contained in:
“tjq” 2020-11-07 22:46:58 +08:00
commit 20a904f4e6
29 changed files with 325 additions and 117 deletions

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
<powerjob.common.version>3.3.1</powerjob.common.version>
<powerjob.common.version>3.3.2</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -2,13 +2,14 @@ package com.github.kfcfans.powerjob.client;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
@ -20,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
import static com.github.kfcfans.powerjob.client.TypeStore.*;
/**
* OpenAPI 客户端
*
@ -27,7 +30,6 @@ import java.util.Objects;
* @since 2020/4/15
*/
@Slf4j
@SuppressWarnings("rawtypes, unchecked")
public class OhMyClient {
private Long appId;
@ -62,9 +64,9 @@ public class OhMyClient {
try {
String result = assertApp(appName, password, url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
ResultDTO<Long> resultDTO = JSONObject.parseObject(result, LONG_RESULT_TYPE);
if (resultDTO.isSuccess()) {
appId = Long.parseLong(resultDTO.getData().toString());
appId = resultDTO.getData();
currentAddress = addr;
break;
}else {
@ -101,75 +103,75 @@ public class OhMyClient {
* 保存任务包括创建与修改
* @param request 任务详细参数
* @return 创建的任务ID
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Long> saveJob(SaveJobInfoRequest request) throws Exception {
public ResultDTO<Long> saveJob(SaveJobInfoRequest request) throws PowerJobException {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JSONObject.toJSONString(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
/**
* 根据 jobId 查询任务信息
* @param jobId 任务ID
* @return 任务详细信息
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<JobInfoDTO> fetchJob(Long jobId) throws Exception {
public ResultDTO<JobInfoDTO> fetchJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_JOB, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, JOB_RESULT_TYPE);
}
/**
* 禁用某个任务
* @param jobId 任务ID
* @return 标准返回对象
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> disableJob(Long jobId) throws Exception {
public ResultDTO<Void> disableJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_JOB, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 启用某个任务
* @param jobId 任务ID
* @return 标准返回对象
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> enableJob(Long jobId) throws Exception {
public ResultDTO<Void> enableJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_JOB, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 删除某个任务
* @param jobId 任务ID
* @return 标准返回对象
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
public ResultDTO<Void> deleteJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_JOB, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@ -178,9 +180,9 @@ public class OhMyClient {
* @param instanceParams 任务实例的参数
* @param delayMS 延迟时间单位毫秒
* @return 任务实例IDinstanceId
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS) throws Exception {
public ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS) throws PowerJobException {
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
@ -190,9 +192,9 @@ public class OhMyClient {
builder.add("instanceParams", instanceParams);
}
String post = postHA(OpenAPIConstant.RUN_JOB, builder.build());
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
public ResultDTO<Long> runJob(Long jobId) throws PowerJobException {
return runJob(jobId, null, 0);
}
@ -201,15 +203,15 @@ public class OhMyClient {
* 停止应用实例
* @param instanceId 应用实例ID
* @return true 停止成功false 停止失败
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> stopInstance(Long instanceId) throws Exception {
public ResultDTO<Void> stopInstance(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_INSTANCE, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@ -217,15 +219,15 @@ public class OhMyClient {
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
* @param instanceId 任务实例ID
* @return true 代表取消成功false 取消失败
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> cancelInstance(Long instanceId) throws Exception {
public ResultDTO<Void> cancelInstance(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@ -233,43 +235,43 @@ public class OhMyClient {
* 只有完成状态成功失败手动停止被取消的任务才能被重试且暂不支持工作流内任务实例的重试
* @param instanceId 任务实例ID
* @return true 代表取消成功false 取消失败
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> retryInstance(Long instanceId) throws Exception {
public ResultDTO<Void> retryInstance(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 查询任务实例状态
* @param instanceId 应用实例ID
* @return {@link InstanceStatus} 的枚举值
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) throws Exception {
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, INTEGER_RESULT_TYPE);
}
/**
* 查询任务实例的信息
* @param instanceId 任务实例ID
* @return 任务实例信息
* @throws Exception 潜在的异常
* @throws PowerJobException 潜在的异常
*/
public ResultDTO<InstanceInfoDTO> fetchInstanceInfo(Long instanceId) throws Exception {
public ResultDTO<InstanceInfoDTO> fetchInstanceInfo(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, INSTANCE_RESULT_TYPE);
}
/* ************* Workflow 区 ************* */
@ -277,74 +279,75 @@ public class OhMyClient {
* 保存工作流包括创建和修改
* @param request 创建/修改 Workflow 请求
* @return 工作流ID
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) throws Exception {
public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) throws PowerJobException {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JSONObject.toJSONString(request);
// 中坑记录 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG null无语.jpg
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
/**
* 根据 workflowId 查询工作流信息
* @param workflowId workflowId
* @return 工作流信息
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<WorkflowInfoDTO> fetchWorkflow(Long workflowId) throws Exception {
public ResultDTO<WorkflowInfoDTO> fetchWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, WF_RESULT_TYPE);
}
/**
* 禁用某个工作流
* @param workflowId 工作流ID
* @return 标准返回对象
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> disableWorkflow(Long workflowId) throws Exception {
public ResultDTO<Void> disableWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 启用某个工作流
* @param workflowId workflowId
* @return 标准返回对象
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> enableWorkflow(Long workflowId) throws Exception {
public ResultDTO<Void> enableWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 删除某个工作流
* @param workflowId workflowId
* @return 标准返回对象
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> deleteWorkflow(Long workflowId) throws Exception {
public ResultDTO<Void> deleteWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@ -353,9 +356,9 @@ public class OhMyClient {
* @param initParams 启动参数
* @param delayMS 延迟时间单位毫秒 ms
* @return 工作流实例ID
* @throws Exception 异常信息
* @throws PowerJobException 异常信息
*/
public ResultDTO<Long> runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception {
public ResultDTO<Long> runWorkflow(Long workflowId, String initParams, long delayMS) throws PowerJobException {
FormBody.Builder builder = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
@ -364,9 +367,9 @@ public class OhMyClient {
builder.add("initParams", initParams);
}
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception {
public ResultDTO<Long> runWorkflow(Long workflowId) throws PowerJobException {
return runWorkflow(workflowId, null, 0);
}
@ -375,30 +378,30 @@ public class OhMyClient {
* 停止应用实例
* @param wfInstanceId 工作流实例ID
* @return true 停止成功 false 停止失败
* @throws Exception 异常
* @throws PowerJobException 异常
*/
public ResultDTO<Void> stopWorkflowInstance(Long wfInstanceId) throws Exception {
public ResultDTO<Void> stopWorkflowInstance(Long wfInstanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 查询任务实例的信息
* @param wfInstanceId 任务实例ID
* @return 任务实例信息
* @throws Exception 潜在的异常
* @throws PowerJobException 潜在的异常
*/
public ResultDTO<WorkflowInstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId) throws Exception {
public ResultDTO<WorkflowInstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, WF_INSTANCE_RESULT_TYPE);
}
@ -412,7 +415,7 @@ public class OhMyClient {
if (StringUtils.isNotEmpty(res)) {
return res;
}
}catch (Exception e) {
}catch (IOException e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
@ -429,7 +432,7 @@ public class OhMyClient {
currentAddress = addr;
return res;
}
}catch (Exception e) {
}catch (IOException e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
}

View File

@ -0,0 +1,28 @@
package com.github.kfcfans.powerjob.client;
import com.alibaba.fastjson.TypeReference;
import com.github.kfcfans.powerjob.common.response.*;
/**
* 类型工厂
*
* @author tjq
* @since 11/7/20
*/
public class TypeStore {
public static final TypeReference<ResultDTO<Void>> VOID_RESULT_TYPE = new TypeReference<ResultDTO<Void>>(){};
public static final TypeReference<ResultDTO<Integer>> INTEGER_RESULT_TYPE = new TypeReference<ResultDTO<Integer>>(){};
public static final TypeReference<ResultDTO<Long>> LONG_RESULT_TYPE = new TypeReference<ResultDTO<Long>>(){};
public static final TypeReference<ResultDTO<JobInfoDTO>> JOB_RESULT_TYPE = new TypeReference<ResultDTO<JobInfoDTO>>(){};
public static final TypeReference<ResultDTO<InstanceInfoDTO>> INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<InstanceInfoDTO>>() {};
public static final TypeReference<ResultDTO<WorkflowInfoDTO>> WF_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInfoDTO>>() {};
public static final TypeReference<ResultDTO<WorkflowInstanceInfoDTO>> WF_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInstanceInfoDTO>>() {};
}

View File

@ -21,6 +21,8 @@ public class TestClient {
private static OhMyClient ohMyClient;
public static final long JOB_ID = 4L;
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
@ -30,7 +32,7 @@ public class TestClient {
public void testSaveJob() throws Exception {
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
// newJobInfo.setId(8L);
newJobInfo.setId(JOB_ID);
newJobInfo.setJobName("omsOpenAPIJobccccc");
newJobInfo.setJobDescription("tes OpenAPI");
newJobInfo.setJobParams("{'aa':'bb'}");
@ -38,8 +40,8 @@ public class TestClient {
newJobInfo.setTimeExpression("0 0 * * * ? ");
newJobInfo.setExecuteType(ExecuteType.STANDALONE);
newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA);
newJobInfo.setProcessorInfo("com.github.kfcfans.oms.server.tester.OmsLogPerformanceTester");
newJobInfo.setDesignatedWorkers("192.168.1.1:2777");
newJobInfo.setProcessorInfo("com.github.kfcfans.powerjob.samples.processors.StandaloneProcessorDemo");
newJobInfo.setDesignatedWorkers("");
newJobInfo.setMinCpuCores(1.1);
newJobInfo.setMinMemorySpace(1.2);
@ -51,48 +53,53 @@ public class TestClient {
@Test
public void testFetchJob() throws Exception {
ResultDTO<JobInfoDTO> fetchJob = ohMyClient.fetchJob(1L);
ResultDTO<JobInfoDTO> fetchJob = ohMyClient.fetchJob(JOB_ID);
System.out.println(JSONObject.toJSONString(fetchJob));
}
@Test
public void testDisableJob() throws Exception {
System.out.println(ohMyClient.disableJob(7L));
System.out.println(ohMyClient.disableJob(JOB_ID));
}
@Test
public void testEnableJob() throws Exception {
System.out.println(ohMyClient.enableJob(7L));
System.out.println(ohMyClient.enableJob(JOB_ID));
}
@Test
public void testDeleteJob() throws Exception {
System.out.println(ohMyClient.deleteJob(7L));
System.out.println(ohMyClient.deleteJob(JOB_ID));
}
@Test
public void testRunJob() throws Exception {
System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000));
public void testRun() {
System.out.println(ohMyClient.runJob(JOB_ID));
}
@Test
public void testRunJobDelay() throws Exception {
System.out.println(ohMyClient.runJob(JOB_ID, "this is instanceParams", 60000));
}
@Test
public void testFetchInstanceInfo() throws Exception {
System.out.println(ohMyClient.fetchInstanceInfo(141251409466097728L));
System.out.println(ohMyClient.fetchInstanceInfo(205436386851946560L));
}
@Test
public void testStopInstance() throws Exception {
ResultDTO<Void> res = ohMyClient.stopInstance(141251409466097728L);
ResultDTO<Void> res = ohMyClient.stopInstance(205436995885858880L);
System.out.println(res.toString());
}
@Test
public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L));
System.out.println(ohMyClient.fetchInstanceStatus(205436995885858880L));
}
@Test
public void testCancelInstanceInTimeWheel() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
ResultDTO<Long> startRes = ohMyClient.runJob(JOB_ID, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));

View File

@ -22,6 +22,8 @@ public class TestWorkflow {
private static OhMyClient ohMyClient;
private static final long WF_ID = 1;
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
@ -64,32 +66,33 @@ public class TestWorkflow {
req.setEnable(true);
req.setTimeExpressionType(TimeExpressionType.API);
System.out.println("req ->" + JSONObject.toJSON(req));
System.out.println(ohMyClient.saveWorkflow(req));
}
@Test
public void testDisableWorkflow() throws Exception {
System.out.println(ohMyClient.disableWorkflow(4L));
System.out.println(ohMyClient.disableWorkflow(WF_ID));
}
@Test
public void testDeleteWorkflow() throws Exception {
System.out.println(ohMyClient.deleteWorkflow(4L));
System.out.println(ohMyClient.deleteWorkflow(WF_ID));
}
@Test
public void testEnableWorkflow() throws Exception {
System.out.println(ohMyClient.enableWorkflow(4L));
System.out.println(ohMyClient.enableWorkflow(WF_ID));
}
@Test
public void testFetchWorkflowInfo() throws Exception {
System.out.println(ohMyClient.fetchWorkflow(5L));
System.out.println(ohMyClient.fetchWorkflow(WF_ID));
}
@Test
public void testRunWorkflow() throws Exception {
System.out.println(ohMyClient.runWorkflow(5L));
System.out.println(ohMyClient.runWorkflow(WF_ID));
}
@Test
@ -104,6 +107,6 @@ public class TestWorkflow {
@Test
public void testRunWorkflowPlus() throws Exception {
System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000));
System.out.println(ohMyClient.runWorkflow(WF_ID, "this is init Params 2", 90000));
}
}

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<packaging>jar</packaging>
<properties>

View File

@ -33,5 +33,5 @@ public class RemoteConstant {
/* ************************ OTHERS ************************ */
public static final String EMPTY_ADDRESS = "N/A";
public static final long DEFAULT_TIMEOUT_MS = 3000;
public static final long DEFAULT_TIMEOUT_MS = 5000;
}

View File

@ -28,6 +28,8 @@ public class InstanceDetail implements OmsSerializable {
private String result;
// TaskTracker地址
private String taskTrackerAddress;
// 启动参数
private String instanceParams;
// MR或BD任务专用
private TaskDetail taskDetail;

View File

@ -9,6 +9,7 @@ import lombok.NoArgsConstructor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
/**
@ -20,7 +21,7 @@ import java.util.List;
*/
@Data
@NoArgsConstructor
public class PEWorkflowDAG {
public class PEWorkflowDAG implements Serializable {
// DAG 点线表示法
private List<Node> nodes;
@ -30,7 +31,7 @@ public class PEWorkflowDAG {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Node {
public static class Node implements Serializable {
private Long jobId;
private String jobName;
@ -50,7 +51,7 @@ public class PEWorkflowDAG {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Edge {
public static class Edge implements Serializable {
private Long from;
private Long to;
}

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
@ -15,7 +16,7 @@ import java.util.List;
* @since 2020/5/26
*/
@Data
public class SaveWorkflowRequest {
public class SaveWorkflowRequest implements Serializable {
private Long id;

View File

@ -53,4 +53,8 @@ public class AskResponse implements OmsSerializable {
return JsonUtils.parseObject(data, clz);
}
public String parseDataAsString() {
return new String(data, StandardCharsets.UTF_8);
}
}

View File

@ -21,6 +21,8 @@ public class WorkflowInstanceInfoDTO {
// workflow 状态WorkflowInstanceStatus
private Integer status;
// 工作流启动参数
private String wfInitParams;
private String dag;
private String result;

View File

@ -29,8 +29,12 @@ public class JsonUtils {
return null;
}
public static String toJSONStringUnsafe(Object obj) throws JsonProcessingException {
public static String toJSONStringUnsafe(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
}catch (Exception e) {
throw new PowerJobException(e);
}
}
public static byte[] toBytes(Object obj) {

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.3.1</powerjob.common.version>
<powerjob.common.version>3.3.2</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.server.akka;
import akka.actor.*;
import akka.pattern.Patterns;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerActor;
@ -16,8 +19,10 @@ import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletionStage;
/**
* 服务端 ActorSystem 启动器
@ -90,4 +95,19 @@ public class OhMyServer {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
/**
* ASK 其他 powerjob-server要求 AskResponse 中的 Data String
* @param address 其他 powerjob-server 的地址ip:port
* @param request 请求
* @return 返回值 OR 异常
*/
public static String askFriend(String address, Object request) throws Exception {
CompletionStage<Object> askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get();
if (askResponse.isSuccess()) {
return askResponse.parseDataAsString();
}
throw new PowerJobException("remote server process failed:" + askResponse.getMessage());
}
}

View File

@ -1,11 +1,16 @@
package com.github.kfcfans.powerjob.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.akka.requests.Ping;
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.service.JobService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowService;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@ -22,6 +27,7 @@ public class FriendActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(Ping.class, this::onReceivePing)
.match(RunJobOrWorkflowReq.class, this::onReceiveFriendResendRunRequest)
.match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq)
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build();
@ -42,4 +48,27 @@ public class FriendActor extends AbstractActor {
AskResponse askResponse = AskResponse.succeed(workerInfo);
getSender().tell(askResponse, getSelf());
}
/**
* 处理 run 转发
*/
private void onReceiveFriendResendRunRequest(RunJobOrWorkflowReq req) {
try {
Long resultId;
switch (req.getType()) {
case RunJobOrWorkflowReq.WORKFLOW:
resultId = SpringUtils.getBean(WorkflowService.class).runWorkflow(req.getId(), req.getAppId(), req.getParams(), req.getDelay());
break;
case RunJobOrWorkflowReq.JOB:
resultId = SpringUtils.getBean(JobService.class).runJob(req.getId(), req.getParams(), req.getDelay());
break;
default:
throw new PowerJobException("unknown type: " + req.getType());
}
getSender().tell(AskResponse.succeed(String.valueOf(resultId)), getSelf());
} catch (Exception e) {
log.error("[FriendActor] process run request [{}] failed!", req, e);
getSender().tell(AskResponse.failed(e.getMessage()), getSelf());
}
}
}

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.powerjob.server.akka.requests;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 运行 Job 工作流需要转发到 server 进行否则没有集群信息
*
* @author tjq
* @since 11/7/20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RunJobOrWorkflowReq implements OmsSerializable {
public static final int JOB = 1;
public static final int WORKFLOW = 2;
private int type;
private long id;
private long delay;
private String params;
private long appId;
}

View File

@ -68,7 +68,7 @@ public class DispatchService {
// 检查当前任务是否被取消
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo.getStatus() == CANCELED.getV()) {
if ( CANCELED.getV() == instanceInfo.getStatus()) {
log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, instanceId);
return;
}

View File

@ -5,11 +5,15 @@ import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
@ -22,6 +26,7 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
@ -44,6 +49,9 @@ public class JobService {
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private AppInfoRepository appInfoRepository;
/**
* 保存/修改任务
* @param request 任务请求
@ -70,9 +78,8 @@ public class JobService {
jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxWorkerCount(0);
}
// 填充默认值非空保护防止 NPE
fillDefaultValue(jobInfoDO);
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
@ -103,12 +110,30 @@ public class JobService {
*/
public long runJob(Long jobId, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay);
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
AppInfoDO appInfo = appInfoRepository.findById(jobInfo.getAppId()).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + jobInfo.getAppId()));
String targetServer = appInfo.getCurrentServer();
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
return realRunJob(jobInfo, instanceParams, delay);
}
// 转发请求
log.info("[Job-{}] redirect run request[params={}] to target server: {}", jobId, instanceParams, targetServer);
RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.JOB, jobId, delay, instanceParams, jobInfo.getAppId());
try {
return Long.parseLong(OhMyServer.askFriend(targetServer, req));
}catch (Exception e) {
log.error("[Job-{}] redirect run request[params={}] to target server[{}] failed!", jobId, instanceParams, targetServer);
throw new PowerJobException("redirect run request failed!", e);
}
}
private long realRunJob(JobInfoDO jobInfo, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobInfo.getId(), instanceParams, delay);
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}else {
@ -116,7 +141,7 @@ public class JobService {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
});
}
log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId);
log.info("[Job-{}] run job successfully, params={}, instanceId={}", jobInfo.getId(), instanceParams, instanceId);
return instanceId;
}
@ -204,4 +229,22 @@ public class JobService {
jobInfoDO.setGmtModified(now);
}
private void fillDefaultValue(JobInfoDO jobInfoDO) {
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxWorkerCount(0);
}
if (jobInfoDO.getMaxInstanceNum() == null) {
jobInfoDO.setMaxInstanceNum(0);
}
if (jobInfoDO.getConcurrency() == null) {
jobInfoDO.setConcurrency(5);
}
if (jobInfoDO.getInstanceRetryNum() == null) {
jobInfoDO.setInstanceRetryNum(0);
}
if (jobInfoDO.getTaskRetryNum() == null) {
jobInfoDO.setTaskRetryNum(0);
}
}
}

View File

@ -248,6 +248,7 @@ public class InstanceService {
if (askResponse.isSuccess()) {
InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class);
instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes());
instanceDetail.setInstanceParams(instanceInfoDO.getInstanceParams());
return instanceDetail;
}else {
log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, the message is {}.", instanceId, askResponse.getMessage());

View File

@ -5,18 +5,24 @@ import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Objects;
/**
* Workflow 服务
@ -24,9 +30,12 @@ import java.util.Date;
* @author tjq
* @since 2020/5/26
*/
@Slf4j
@Service
public class WorkflowService {
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
@ -138,8 +147,28 @@ public class WorkflowService {
public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + appId));
String targetServer = appInfo.getCurrentServer();
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
return realRunWorkflow(wfInfo, initParams, delay);
}
log.info("[WorkflowService-{}] redirect run request[initParams={}] to target server: {}", wfId, initParams, targetServer);
// 转发请求
RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.WORKFLOW, wfId, delay, initParams, appId);
try {
return Long.valueOf(OhMyServer.askFriend(targetServer, req));
}catch (Exception e) {
log.error("[WorkflowService-{}] redirect run request[params={}] to target server[{}] failed!", wfId, initParams, targetServer);
throw new PowerJobException("redirect run request failed!", e);
}
}
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {

View File

@ -34,6 +34,8 @@ public class InstanceDetailVO {
private String result;
// TaskTracker地址
private String taskTrackerAddress;
// 启动参数
private String instanceParams;
// MR或BD任务专用
private InstanceDetailVO.TaskDetail taskDetail;

View File

@ -26,6 +26,8 @@ public class WorkflowInstanceInfoVO {
// workflow 状态WorkflowInstanceStatus
private Integer status;
// 工作流启动参数
private String wfInitParams;
private PEWorkflowDAG pEWorkflowDAG;
private String result;

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.3.1</powerjob.worker.version>
<powerjob.worker.version>3.3.2</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.3.1</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.3.2</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.3.1</powerjob.worker.version>
<powerjob.worker.version>3.3.2</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.3.1</powerjob.common.version>
<powerjob.common.version>3.3.2</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -323,7 +323,7 @@ public class ProcessorTracker {
if (processor == null) {
log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
throw new PowerJobException("fetch Processor failed");
throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
}
}

View File

@ -236,8 +236,8 @@ public class CommonTaskTracker extends TaskTracker {
try {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverAccepted = askResponse.isSuccess();
}catch (Exception ignore) {
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result);
}catch (Exception e) {
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result, e);
}
// 服务器未接受上报则等待下次重新上报