diff --git a/README.md b/README.md
index 936894e6..bfcf5996 100644
--- a/README.md
+++ b/README.md
@@ -26,7 +26,7 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。
-* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。
+* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。
### 设计目标
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
@@ -43,7 +43,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ |
| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** |
| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | **内置Java、外置Java(容器)、Shell、Python等脚本** |
-| 分布式任务 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** |
+| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** |
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
@@ -52,20 +52,21 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
-# 文档
+# 官方文档
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!
-# 参考
->Alibaba SchedulerX 2.0
+# 接入登记
+[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6)
-* [Akka 框架](https://yq.aliyun.com/articles/709946?spm=a2c4e.11153959.teamhomeleft.67.6a0560c9bZEnZq):不得不说,akka-remote简化了相当大一部分的网络通讯代码。
-* [执行器架构设计](https://yq.aliyun.com/articles/704121?spm=a2c4e.11153959.teamhomeleft.97.371960c9qhB1mB):这篇文章反而不太认同,感觉我个人的设计更符合Yarn的“架构”。
-* [MapReduce模型](https://yq.aliyun.com/articles/706820?spm=a2c4e.11153959.teamhomeleft.83.6a0560c9bZEnZq):想法很Cool,大数据处理框架都是处理器向数据移动,但对于传统Java应用来说,数据向处理器移动也未尝不可,这样还能使框架的实现变得简单很多。
-* [广播执行](https://yq.aliyun.com/articles/716203?spm=a2c4e.11153959.teamhomeleft.40.371960c9qhB1mB):运行清理日志脚本什么的,也太实用了8~
+ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ
+
+
+
+
# 其他
* 开源许可证:Apache License, Version 2.0
diff --git a/others/images/user.png b/others/images/user.png
index cc98efb6..4c962778 100644
Binary files a/others/images/user.png and b/others/images/user.png differ
diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index 987c1d41..c17c1dee 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-client
- 3.3.1
+ 3.3.2
jar
5.6.1
1.2.68
- 3.3.1
+ 3.3.2
3.2.4
diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
index cadeb661..04efad72 100644
--- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
+++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
@@ -2,13 +2,14 @@ package com.github.kfcfans.powerjob.client;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus;
-import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
+import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
@@ -20,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import static com.github.kfcfans.powerjob.client.TypeStore.*;
+
/**
* OpenAPI 客户端
*
@@ -27,7 +30,6 @@ import java.util.Objects;
* @since 2020/4/15
*/
@Slf4j
-@SuppressWarnings("rawtypes, unchecked")
public class OhMyClient {
private Long appId;
@@ -62,9 +64,9 @@ public class OhMyClient {
try {
String result = assertApp(appName, password, url);
if (StringUtils.isNotEmpty(result)) {
- ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
+ ResultDTO resultDTO = JSONObject.parseObject(result, LONG_RESULT_TYPE);
if (resultDTO.isSuccess()) {
- appId = Long.parseLong(resultDTO.getData().toString());
+ appId = resultDTO.getData();
currentAddress = addr;
break;
}else {
@@ -101,75 +103,75 @@ public class OhMyClient {
* 保存任务(包括创建与修改)
* @param request 任务详细参数
* @return 创建的任务ID
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO saveJob(SaveJobInfoRequest request) throws Exception {
+ public ResultDTO saveJob(SaveJobInfoRequest request) throws PowerJobException {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JSONObject.toJSONString(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
/**
* 根据 jobId 查询任务信息
* @param jobId 任务ID
* @return 任务详细信息
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO fetchJob(Long jobId) throws Exception {
+ public ResultDTO fetchJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_JOB, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, JOB_RESULT_TYPE);
}
/**
* 禁用某个任务
* @param jobId 任务ID
* @return 标准返回对象
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO disableJob(Long jobId) throws Exception {
+ public ResultDTO disableJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_JOB, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 启用某个任务
* @param jobId 任务ID
* @return 标准返回对象
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO enableJob(Long jobId) throws Exception {
+ public ResultDTO enableJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_JOB, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 删除某个任务
* @param jobId 任务ID
* @return 标准返回对象
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO deleteJob(Long jobId) throws Exception {
+ public ResultDTO deleteJob(Long jobId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_JOB, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@@ -178,9 +180,9 @@ public class OhMyClient {
* @param instanceParams 任务实例的参数
* @param delayMS 延迟时间,单位毫秒
* @return 任务实例ID(instanceId)
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) throws Exception {
+ public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) throws PowerJobException {
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
@@ -190,9 +192,9 @@ public class OhMyClient {
builder.add("instanceParams", instanceParams);
}
String post = postHA(OpenAPIConstant.RUN_JOB, builder.build());
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
- public ResultDTO runJob(Long jobId) throws Exception {
+ public ResultDTO runJob(Long jobId) throws PowerJobException {
return runJob(jobId, null, 0);
}
@@ -201,15 +203,15 @@ public class OhMyClient {
* 停止应用实例
* @param instanceId 应用实例ID
* @return true 停止成功,false 停止失败
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO stopInstance(Long instanceId) throws Exception {
+ public ResultDTO stopInstance(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_INSTANCE, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@@ -217,15 +219,15 @@ public class OhMyClient {
* 接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性!
* @param instanceId 任务实例ID
* @return true 代表取消成功,false 取消失败
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO cancelInstance(Long instanceId) throws Exception {
+ public ResultDTO cancelInstance(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@@ -233,43 +235,43 @@ public class OhMyClient {
* 只有完成状态(成功、失败、手动停止、被取消)的任务才能被重试,且暂不支持工作流内任务实例的重试
* @param instanceId 任务实例ID
* @return true 代表取消成功,false 取消失败
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO retryInstance(Long instanceId) throws Exception {
+ public ResultDTO retryInstance(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 查询任务实例状态
* @param instanceId 应用实例ID
* @return {@link InstanceStatus} 的枚举值
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO fetchInstanceStatus(Long instanceId) throws Exception {
+ public ResultDTO fetchInstanceStatus(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, INTEGER_RESULT_TYPE);
}
/**
* 查询任务实例的信息
* @param instanceId 任务实例ID
* @return 任务实例信息
- * @throws Exception 潜在的异常
+ * @throws PowerJobException 潜在的异常
*/
- public ResultDTO fetchInstanceInfo(Long instanceId) throws Exception {
+ public ResultDTO fetchInstanceInfo(Long instanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, INSTANCE_RESULT_TYPE);
}
/* ************* Workflow 区 ************* */
@@ -277,74 +279,75 @@ public class OhMyClient {
* 保存工作流(包括创建和修改)
* @param request 创建/修改 Workflow 请求
* @return 工作流ID
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws Exception {
+ public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws PowerJobException {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
- String json = JSONObject.toJSONString(request);
+ // 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg
+ String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
/**
* 根据 workflowId 查询工作流信息
* @param workflowId workflowId
* @return 工作流信息
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO fetchWorkflow(Long workflowId) throws Exception {
+ public ResultDTO fetchWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, WF_RESULT_TYPE);
}
/**
* 禁用某个工作流
* @param workflowId 工作流ID
* @return 标准返回对象
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO disableWorkflow(Long workflowId) throws Exception {
+ public ResultDTO disableWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 启用某个工作流
* @param workflowId workflowId
* @return 标准返回对象
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO enableWorkflow(Long workflowId) throws Exception {
+ public ResultDTO enableWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 删除某个工作流
* @param workflowId workflowId
* @return 标准返回对象
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO deleteWorkflow(Long workflowId) throws Exception {
+ public ResultDTO deleteWorkflow(Long workflowId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
@@ -353,9 +356,9 @@ public class OhMyClient {
* @param initParams 启动参数
* @param delayMS 延迟时间,单位毫秒 ms
* @return 工作流实例ID
- * @throws Exception 异常信息
+ * @throws PowerJobException 异常信息
*/
- public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception {
+ public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws PowerJobException {
FormBody.Builder builder = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
@@ -364,9 +367,9 @@ public class OhMyClient {
builder.add("initParams", initParams);
}
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, LONG_RESULT_TYPE);
}
- public ResultDTO runWorkflow(Long workflowId) throws Exception {
+ public ResultDTO runWorkflow(Long workflowId) throws PowerJobException {
return runWorkflow(workflowId, null, 0);
}
@@ -375,30 +378,30 @@ public class OhMyClient {
* 停止应用实例
* @param wfInstanceId 工作流实例ID
* @return true 停止成功 ; false 停止失败
- * @throws Exception 异常
+ * @throws PowerJobException 异常
*/
- public ResultDTO stopWorkflowInstance(Long wfInstanceId) throws Exception {
+ public ResultDTO stopWorkflowInstance(Long wfInstanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, VOID_RESULT_TYPE);
}
/**
* 查询任务实例的信息
* @param wfInstanceId 任务实例ID
* @return 任务实例信息
- * @throws Exception 潜在的异常
+ * @throws PowerJobException 潜在的异常
*/
- public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId) throws Exception {
+ public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId) throws PowerJobException {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body);
- return JSONObject.parseObject(post, ResultDTO.class);
+ return JSONObject.parseObject(post, WF_INSTANCE_RESULT_TYPE);
}
@@ -412,7 +415,7 @@ public class OhMyClient {
if (StringUtils.isNotEmpty(res)) {
return res;
}
- }catch (Exception e) {
+ }catch (IOException e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
@@ -429,7 +432,7 @@ public class OhMyClient {
currentAddress = addr;
return res;
}
- }catch (Exception e) {
+ }catch (IOException e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
}
diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java
new file mode 100644
index 00000000..754e35ed
--- /dev/null
+++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java
@@ -0,0 +1,28 @@
+package com.github.kfcfans.powerjob.client;
+
+import com.alibaba.fastjson.TypeReference;
+import com.github.kfcfans.powerjob.common.response.*;
+
+/**
+ * 类型工厂
+ *
+ * @author tjq
+ * @since 11/7/20
+ */
+public class TypeStore {
+
+ public static final TypeReference> VOID_RESULT_TYPE = new TypeReference>(){};
+
+ public static final TypeReference> INTEGER_RESULT_TYPE = new TypeReference>(){};
+
+ public static final TypeReference> LONG_RESULT_TYPE = new TypeReference>(){};
+
+ public static final TypeReference> JOB_RESULT_TYPE = new TypeReference>(){};
+
+ public static final TypeReference> INSTANCE_RESULT_TYPE = new TypeReference>() {};
+
+ public static final TypeReference> WF_RESULT_TYPE = new TypeReference>() {};
+
+ public static final TypeReference> WF_INSTANCE_RESULT_TYPE = new TypeReference>() {};
+
+}
diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java
index dd469c99..f50aed98 100644
--- a/powerjob-client/src/test/java/TestClient.java
+++ b/powerjob-client/src/test/java/TestClient.java
@@ -21,6 +21,8 @@ public class TestClient {
private static OhMyClient ohMyClient;
+ public static final long JOB_ID = 4L;
+
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
@@ -30,7 +32,7 @@ public class TestClient {
public void testSaveJob() throws Exception {
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
-// newJobInfo.setId(8L);
+ newJobInfo.setId(JOB_ID);
newJobInfo.setJobName("omsOpenAPIJobccccc");
newJobInfo.setJobDescription("tes OpenAPI");
newJobInfo.setJobParams("{'aa':'bb'}");
@@ -38,8 +40,8 @@ public class TestClient {
newJobInfo.setTimeExpression("0 0 * * * ? ");
newJobInfo.setExecuteType(ExecuteType.STANDALONE);
newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA);
- newJobInfo.setProcessorInfo("com.github.kfcfans.oms.server.tester.OmsLogPerformanceTester");
- newJobInfo.setDesignatedWorkers("192.168.1.1:2777");
+ newJobInfo.setProcessorInfo("com.github.kfcfans.powerjob.samples.processors.StandaloneProcessorDemo");
+ newJobInfo.setDesignatedWorkers("");
newJobInfo.setMinCpuCores(1.1);
newJobInfo.setMinMemorySpace(1.2);
@@ -51,48 +53,53 @@ public class TestClient {
@Test
public void testFetchJob() throws Exception {
- ResultDTO fetchJob = ohMyClient.fetchJob(1L);
+ ResultDTO fetchJob = ohMyClient.fetchJob(JOB_ID);
System.out.println(JSONObject.toJSONString(fetchJob));
}
@Test
public void testDisableJob() throws Exception {
- System.out.println(ohMyClient.disableJob(7L));
+ System.out.println(ohMyClient.disableJob(JOB_ID));
}
@Test
public void testEnableJob() throws Exception {
- System.out.println(ohMyClient.enableJob(7L));
+ System.out.println(ohMyClient.enableJob(JOB_ID));
}
@Test
public void testDeleteJob() throws Exception {
- System.out.println(ohMyClient.deleteJob(7L));
+ System.out.println(ohMyClient.deleteJob(JOB_ID));
}
@Test
- public void testRunJob() throws Exception {
- System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000));
+ public void testRun() {
+ System.out.println(ohMyClient.runJob(JOB_ID));
+ }
+
+ @Test
+ public void testRunJobDelay() throws Exception {
+ System.out.println(ohMyClient.runJob(JOB_ID, "this is instanceParams", 60000));
}
@Test
public void testFetchInstanceInfo() throws Exception {
- System.out.println(ohMyClient.fetchInstanceInfo(141251409466097728L));
+ System.out.println(ohMyClient.fetchInstanceInfo(205436386851946560L));
}
@Test
public void testStopInstance() throws Exception {
- ResultDTO res = ohMyClient.stopInstance(141251409466097728L);
+ ResultDTO res = ohMyClient.stopInstance(205436995885858880L);
System.out.println(res.toString());
}
@Test
public void testFetchInstanceStatus() throws Exception {
- System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L));
+ System.out.println(ohMyClient.fetchInstanceStatus(205436995885858880L));
}
@Test
public void testCancelInstanceInTimeWheel() throws Exception {
- ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
+ ResultDTO startRes = ohMyClient.runJob(JOB_ID, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));
diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java
index 9e04d80e..6feaf266 100644
--- a/powerjob-client/src/test/java/TestWorkflow.java
+++ b/powerjob-client/src/test/java/TestWorkflow.java
@@ -22,6 +22,8 @@ public class TestWorkflow {
private static OhMyClient ohMyClient;
+ private static final long WF_ID = 1;
+
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
@@ -64,32 +66,33 @@ public class TestWorkflow {
req.setEnable(true);
req.setTimeExpressionType(TimeExpressionType.API);
+ System.out.println("req ->" + JSONObject.toJSON(req));
System.out.println(ohMyClient.saveWorkflow(req));
}
@Test
public void testDisableWorkflow() throws Exception {
- System.out.println(ohMyClient.disableWorkflow(4L));
+ System.out.println(ohMyClient.disableWorkflow(WF_ID));
}
@Test
public void testDeleteWorkflow() throws Exception {
- System.out.println(ohMyClient.deleteWorkflow(4L));
+ System.out.println(ohMyClient.deleteWorkflow(WF_ID));
}
@Test
public void testEnableWorkflow() throws Exception {
- System.out.println(ohMyClient.enableWorkflow(4L));
+ System.out.println(ohMyClient.enableWorkflow(WF_ID));
}
@Test
public void testFetchWorkflowInfo() throws Exception {
- System.out.println(ohMyClient.fetchWorkflow(5L));
+ System.out.println(ohMyClient.fetchWorkflow(WF_ID));
}
@Test
public void testRunWorkflow() throws Exception {
- System.out.println(ohMyClient.runWorkflow(5L));
+ System.out.println(ohMyClient.runWorkflow(WF_ID));
}
@Test
@@ -104,6 +107,6 @@ public class TestWorkflow {
@Test
public void testRunWorkflowPlus() throws Exception {
- System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000));
+ System.out.println(ohMyClient.runWorkflow(WF_ID, "this is init Params 2", 90000));
}
}
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index 3bca0b62..6c9b9ec8 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -10,7 +10,7 @@
4.0.0
powerjob-common
- 3.3.1
+ 3.3.2
jar
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java
index 3fa65ab0..814e104a 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java
@@ -33,5 +33,5 @@ public class RemoteConstant {
/* ************************ OTHERS ************************ */
public static final String EMPTY_ADDRESS = "N/A";
- public static final long DEFAULT_TIMEOUT_MS = 3000;
+ public static final long DEFAULT_TIMEOUT_MS = 5000;
}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java
index 6706bca0..e690c3ab 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java
@@ -28,6 +28,8 @@ public class InstanceDetail implements OmsSerializable {
private String result;
// TaskTracker地址
private String taskTrackerAddress;
+ // 启动参数
+ private String instanceParams;
// MR或BD任务专用
private TaskDetail taskDetail;
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java
index afc967af..3fe258ca 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java
@@ -9,6 +9,7 @@ import lombok.NoArgsConstructor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.List;
/**
@@ -20,7 +21,7 @@ import java.util.List;
*/
@Data
@NoArgsConstructor
-public class PEWorkflowDAG {
+public class PEWorkflowDAG implements Serializable {
// DAG 图(点线表示法)
private List nodes;
@@ -30,7 +31,7 @@ public class PEWorkflowDAG {
@Data
@NoArgsConstructor
@AllArgsConstructor
- public static class Node {
+ public static class Node implements Serializable {
private Long jobId;
private String jobName;
@@ -50,7 +51,7 @@ public class PEWorkflowDAG {
@Data
@NoArgsConstructor
@AllArgsConstructor
- public static class Edge {
+ public static class Edge implements Serializable {
private Long from;
private Long to;
}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java
index 3e0e03e1..3bf4138a 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java
@@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
+import java.io.Serializable;
import java.util.List;
/**
@@ -15,7 +16,7 @@ import java.util.List;
* @since 2020/5/26
*/
@Data
-public class SaveWorkflowRequest {
+public class SaveWorkflowRequest implements Serializable {
private Long id;
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java
index b6fc9d28..5f6faa89 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java
@@ -53,4 +53,8 @@ public class AskResponse implements OmsSerializable {
return JsonUtils.parseObject(data, clz);
}
+ public String parseDataAsString() {
+ return new String(data, StandardCharsets.UTF_8);
+ }
+
}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java
index 610ce15b..c9a22df4 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java
@@ -21,6 +21,8 @@ public class WorkflowInstanceInfoDTO {
// workflow 状态(WorkflowInstanceStatus)
private Integer status;
+ // 工作流启动参数
+ private String wfInitParams;
private String dag;
private String result;
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java
index 2e91357f..68cc5123 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java
@@ -29,8 +29,12 @@ public class JsonUtils {
return null;
}
- public static String toJSONStringUnsafe(Object obj) throws JsonProcessingException {
- return objectMapper.writeValueAsString(obj);
+ public static String toJSONStringUnsafe(Object obj) {
+ try {
+ return objectMapper.writeValueAsString(obj);
+ }catch (Exception e) {
+ throw new PowerJobException(e);
+ }
}
public static byte[] toBytes(Object obj) {
diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index a5e18322..e6e148e4 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-server
- 3.3.1
+ 3.3.2
jar
2.9.2
2.3.4.RELEASE
- 3.3.1
+ 3.3.2
8.0.19
19.7.0.0
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java
index f4568e0e..f818675a 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java
@@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.server.akka;
import akka.actor.*;
+import akka.pattern.Patterns;
import akka.routing.RoundRobinPool;
+import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
+import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerActor;
@@ -16,8 +19,10 @@ import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import java.time.Duration;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletionStage;
/**
* 服务端 ActorSystem 启动器
@@ -90,4 +95,19 @@ public class OhMyServer {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
+
+ /**
+ * ASK 其他 powerjob-server,要求 AskResponse 中的 Data 为 String
+ * @param address 其他 powerjob-server 的地址(ip:port)
+ * @param request 请求
+ * @return 返回值 OR 异常
+ */
+ public static String askFriend(String address, Object request) throws Exception {
+ CompletionStage