From c8fd2b4257592a61c78b8f1dc85c6554e8ddbb33 Mon Sep 17 00:00:00 2001 From: mojianxiao Date: Fri, 16 Oct 2020 18:14:35 +0800 Subject: [PATCH 01/10] [opt] optimize code --- .../github/kfcfans/powerjob/server/service/DispatchService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index d47d7ba8..d92c7a01 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -68,7 +68,7 @@ public class DispatchService { // 检查当前任务是否被取消 InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - if (instanceInfo.getStatus() == CANCELED.getV()) { + if ( CANCELED.getV() == instanceInfo.getStatus()) { log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, instanceId); return; } From e197a6f605fbbb0aa389960f8457d6634e664eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 18:11:22 +0800 Subject: [PATCH 02/10] feat: add instance params #92 --- .../github/kfcfans/powerjob/common/model/InstanceDetail.java | 2 ++ .../powerjob/server/service/instance/InstanceService.java | 1 + .../kfcfans/powerjob/server/web/response/InstanceDetailVO.java | 2 ++ 3 files changed, 5 insertions(+) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java index 6706bca0..e690c3ab 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java @@ -28,6 +28,8 @@ public class InstanceDetail implements OmsSerializable { private String result; // TaskTracker地址 private String taskTrackerAddress; + // 启动参数 + private String instanceParams; // MR或BD任务专用 private TaskDetail taskDetail; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index bab39209..6ad8b12f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -248,6 +248,7 @@ public class InstanceService { if (askResponse.isSuccess()) { InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class); instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes()); + instanceDetail.setInstanceParams(instanceInfoDO.getInstanceParams()); return instanceDetail; }else { log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, the message is {}.", instanceId, askResponse.getMessage()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java index 30c75864..f1081cba 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java @@ -34,6 +34,8 @@ public class InstanceDetailVO { private String result; // TaskTracker地址 private String taskTrackerAddress; + // 启动参数 + private String instanceParams; // MR或BD任务专用 private InstanceDetailVO.TaskDetail taskDetail; From 6924088d16d41a13d369ca800806ac5f158f9a1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 18:27:36 +0800 Subject: [PATCH 03/10] fix: OpenAPI throw ClassCastException #93 --- .../kfcfans/powerjob/client/OhMyClient.java | 131 +++++++++--------- .../kfcfans/powerjob/client/TypeStore.java | 28 ++++ 2 files changed, 94 insertions(+), 65 deletions(-) create mode 100644 powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index cadeb661..31525b28 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -2,8 +2,8 @@ package com.github.kfcfans.powerjob.client; import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.OpenAPIConstant; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.*; @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.List; import java.util.Objects; +import static com.github.kfcfans.powerjob.client.TypeStore.*; + /** * OpenAPI 客户端 * @@ -27,7 +29,6 @@ import java.util.Objects; * @since 2020/4/15 */ @Slf4j -@SuppressWarnings("rawtypes, unchecked") public class OhMyClient { private Long appId; @@ -62,9 +63,9 @@ public class OhMyClient { try { String result = assertApp(appName, password, url); if (StringUtils.isNotEmpty(result)) { - ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class); + ResultDTO resultDTO = JSONObject.parseObject(result, LONG_RESULT_TYPE); if (resultDTO.isSuccess()) { - appId = Long.parseLong(resultDTO.getData().toString()); + appId = resultDTO.getData(); currentAddress = addr; break; }else { @@ -101,75 +102,75 @@ public class OhMyClient { * 保存任务(包括创建与修改) * @param request 任务详细参数 * @return 创建的任务ID - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO saveJob(SaveJobInfoRequest request) throws Exception { + public ResultDTO saveJob(SaveJobInfoRequest request) throws PowerJobException { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JSONObject.toJSONString(request); String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, LONG_RESULT_TYPE); } /** * 根据 jobId 查询任务信息 * @param jobId 任务ID * @return 任务详细信息 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO fetchJob(Long jobId) throws Exception { + public ResultDTO fetchJob(Long jobId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_JOB, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, JOB_RESULT_TYPE); } /** * 禁用某个任务 * @param jobId 任务ID * @return 标准返回对象 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO disableJob(Long jobId) throws Exception { + public ResultDTO disableJob(Long jobId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DISABLE_JOB, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** * 启用某个任务 * @param jobId 任务ID * @return 标准返回对象 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO enableJob(Long jobId) throws Exception { + public ResultDTO enableJob(Long jobId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.ENABLE_JOB, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** * 删除某个任务 * @param jobId 任务ID * @return 标准返回对象 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO deleteJob(Long jobId) throws Exception { + public ResultDTO deleteJob(Long jobId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DELETE_JOB, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** @@ -178,9 +179,9 @@ public class OhMyClient { * @param instanceParams 任务实例的参数 * @param delayMS 延迟时间,单位毫秒 * @return 任务实例ID(instanceId) - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) throws Exception { + public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) throws PowerJobException { FormBody.Builder builder = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) @@ -190,9 +191,9 @@ public class OhMyClient { builder.add("instanceParams", instanceParams); } String post = postHA(OpenAPIConstant.RUN_JOB, builder.build()); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, LONG_RESULT_TYPE); } - public ResultDTO runJob(Long jobId) throws Exception { + public ResultDTO runJob(Long jobId) throws PowerJobException { return runJob(jobId, null, 0); } @@ -201,15 +202,15 @@ public class OhMyClient { * 停止应用实例 * @param instanceId 应用实例ID * @return true 停止成功,false 停止失败 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO stopInstance(Long instanceId) throws Exception { + public ResultDTO stopInstance(Long instanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.STOP_INSTANCE, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** @@ -217,15 +218,15 @@ public class OhMyClient { * 接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性! * @param instanceId 任务实例ID * @return true 代表取消成功,false 取消失败 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO cancelInstance(Long instanceId) throws Exception { + public ResultDTO cancelInstance(Long instanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** @@ -233,43 +234,43 @@ public class OhMyClient { * 只有完成状态(成功、失败、手动停止、被取消)的任务才能被重试,且暂不支持工作流内任务实例的重试 * @param instanceId 任务实例ID * @return true 代表取消成功,false 取消失败 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO retryInstance(Long instanceId) throws Exception { + public ResultDTO retryInstance(Long instanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** * 查询任务实例状态 * @param instanceId 应用实例ID * @return {@link InstanceStatus} 的枚举值 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO fetchInstanceStatus(Long instanceId) throws Exception { + public ResultDTO fetchInstanceStatus(Long instanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, INTEGER_RESULT_TYPE); } /** * 查询任务实例的信息 * @param instanceId 任务实例ID * @return 任务实例信息 - * @throws Exception 潜在的异常 + * @throws PowerJobException 潜在的异常 */ - public ResultDTO fetchInstanceInfo(Long instanceId) throws Exception { + public ResultDTO fetchInstanceInfo(Long instanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, INSTANCE_RESULT_TYPE); } /* ************* Workflow 区 ************* */ @@ -277,74 +278,74 @@ public class OhMyClient { * 保存工作流(包括创建和修改) * @param request 创建/修改 Workflow 请求 * @return 工作流ID - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws Exception { + public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws PowerJobException { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JSONObject.toJSONString(request); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, LONG_RESULT_TYPE); } /** * 根据 workflowId 查询工作流信息 * @param workflowId workflowId * @return 工作流信息 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO fetchWorkflow(Long workflowId) throws Exception { + public ResultDTO fetchWorkflow(Long workflowId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, WF_RESULT_TYPE); } /** * 禁用某个工作流 * @param workflowId 工作流ID * @return 标准返回对象 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO disableWorkflow(Long workflowId) throws Exception { + public ResultDTO disableWorkflow(Long workflowId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** * 启用某个工作流 * @param workflowId workflowId * @return 标准返回对象 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO enableWorkflow(Long workflowId) throws Exception { + public ResultDTO enableWorkflow(Long workflowId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** * 删除某个工作流 * @param workflowId workflowId * @return 标准返回对象 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO deleteWorkflow(Long workflowId) throws Exception { + public ResultDTO deleteWorkflow(Long workflowId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** @@ -353,9 +354,9 @@ public class OhMyClient { * @param initParams 启动参数 * @param delayMS 延迟时间,单位毫秒 ms * @return 工作流实例ID - * @throws Exception 异常信息 + * @throws PowerJobException 异常信息 */ - public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception { + public ResultDTO runWorkflow(Long workflowId, String initParams, long delayMS) throws PowerJobException { FormBody.Builder builder = new FormBody.Builder() .add("workflowId", workflowId.toString()) .add("appId", appId.toString()) @@ -364,9 +365,9 @@ public class OhMyClient { builder.add("initParams", initParams); } String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, LONG_RESULT_TYPE); } - public ResultDTO runWorkflow(Long workflowId) throws Exception { + public ResultDTO runWorkflow(Long workflowId) throws PowerJobException { return runWorkflow(workflowId, null, 0); } @@ -375,30 +376,30 @@ public class OhMyClient { * 停止应用实例 * @param wfInstanceId 工作流实例ID * @return true 停止成功 ; false 停止失败 - * @throws Exception 异常 + * @throws PowerJobException 异常 */ - public ResultDTO stopWorkflowInstance(Long wfInstanceId) throws Exception { + public ResultDTO stopWorkflowInstance(Long wfInstanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("wfInstanceId", wfInstanceId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, VOID_RESULT_TYPE); } /** * 查询任务实例的信息 * @param wfInstanceId 任务实例ID * @return 任务实例信息 - * @throws Exception 潜在的异常 + * @throws PowerJobException 潜在的异常 */ - public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId) throws Exception { + public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId) throws PowerJobException { RequestBody body = new FormBody.Builder() .add("wfInstanceId", wfInstanceId.toString()) .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body); - return JSONObject.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, WF_INSTANCE_RESULT_TYPE); } @@ -412,7 +413,7 @@ public class OhMyClient { if (StringUtils.isNotEmpty(res)) { return res; } - }catch (Exception e) { + }catch (IOException e) { log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString()); } @@ -429,7 +430,7 @@ public class OhMyClient { currentAddress = addr; return res; } - }catch (Exception e) { + }catch (IOException e) { log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString()); } } diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java new file mode 100644 index 00000000..754e35ed --- /dev/null +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java @@ -0,0 +1,28 @@ +package com.github.kfcfans.powerjob.client; + +import com.alibaba.fastjson.TypeReference; +import com.github.kfcfans.powerjob.common.response.*; + +/** + * 类型工厂 + * + * @author tjq + * @since 11/7/20 + */ +public class TypeStore { + + public static final TypeReference> VOID_RESULT_TYPE = new TypeReference>(){}; + + public static final TypeReference> INTEGER_RESULT_TYPE = new TypeReference>(){}; + + public static final TypeReference> LONG_RESULT_TYPE = new TypeReference>(){}; + + public static final TypeReference> JOB_RESULT_TYPE = new TypeReference>(){}; + + public static final TypeReference> INSTANCE_RESULT_TYPE = new TypeReference>() {}; + + public static final TypeReference> WF_RESULT_TYPE = new TypeReference>() {}; + + public static final TypeReference> WF_INSTANCE_RESULT_TYPE = new TypeReference>() {}; + +} From 0cc7cc26b44ec996ebc18f014be58fd59e81edd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 21:03:04 +0800 Subject: [PATCH 04/10] fix: runJob or runWorkflow failed when there are multi server in cluster --- .../powerjob/common/response/AskResponse.java | 4 +++ .../powerjob/server/akka/OhMyServer.java | 20 +++++++++++ .../server/akka/actors/FriendActor.java | 29 ++++++++++++++++ .../akka/requests/RunJobOrWorkflowReq.java | 27 +++++++++++++++ .../powerjob/server/service/JobService.java | 34 ++++++++++++++++--- .../service/workflow/WorkflowService.java | 30 +++++++++++++++- 6 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java index b6fc9d28..b41dba7d 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java @@ -53,4 +53,8 @@ public class AskResponse implements OmsSerializable { return JsonUtils.parseObject(data, clz); } + public String getDataAsString() { + return new String(data, StandardCharsets.UTF_8); + } + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index f4568e0e..513c9bf2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -1,8 +1,11 @@ package com.github.kfcfans.powerjob.server.akka; import akka.actor.*; +import akka.pattern.Patterns; import akka.routing.RoundRobinPool; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.akka.actors.FriendActor; import com.github.kfcfans.powerjob.server.akka.actors.ServerActor; @@ -16,8 +19,10 @@ import com.typesafe.config.ConfigFactory; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.time.Duration; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletionStage; /** * 服务端 ActorSystem 启动器 @@ -90,4 +95,19 @@ public class OhMyServer { String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME); return actorSystem.actorSelection(path); } + + /** + * ASK 其他 powejob-server,要求 AskResponse 中的 Data 为 String + * @param address 其他 powejob-server 的地址(ip:port) + * @param request 请求 + * @return 返回值 OR 异常 + */ + public static String askFriend(String address, Object request) throws Exception { + CompletionStage askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); + if (askResponse.isSuccess()) { + return askResponse.getDataAsString(); + } + throw new PowerJobException("remote server process failed:" + askResponse.getMessage()); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java index f36aaa22..eb28c112 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java @@ -1,11 +1,16 @@ package com.github.kfcfans.powerjob.server.akka.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.akka.requests.Ping; +import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq; +import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; +import com.github.kfcfans.powerjob.server.service.JobService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; +import com.github.kfcfans.powerjob.server.service.workflow.WorkflowService; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -22,6 +27,7 @@ public class FriendActor extends AbstractActor { public Receive createReceive() { return receiveBuilder() .match(Ping.class, this::onReceivePing) + .match(RunJobOrWorkflowReq.class, this::onReceiveFriendResendRunRequest) .match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .build(); @@ -42,4 +48,27 @@ public class FriendActor extends AbstractActor { AskResponse askResponse = AskResponse.succeed(workerInfo); getSender().tell(askResponse, getSelf()); } + + /** + * 处理 run 转发 + */ + private void onReceiveFriendResendRunRequest(RunJobOrWorkflowReq req) { + try { + Long resultId; + switch (req.getType()) { + case RunJobOrWorkflowReq.WORKFLOW: + resultId = SpringUtils.getBean(WorkflowService.class).runWorkflow(req.getId(), req.getAppId(), req.getParams(), req.getDelay()); + break; + case RunJobOrWorkflowReq.JOB: + resultId = SpringUtils.getBean(JobService.class).runJob(req.getId(), req.getParams(), req.getDelay()); + break; + default: + throw new PowerJobException("unknown type: " + req.getType()); + } + getSender().tell(AskResponse.succeed(String.valueOf(resultId)), getSelf()); + } catch (Exception e) { + log.error("[FriendActor] process run request [{}] failed!", req, e); + getSender().tell(AskResponse.failed(e.getMessage()), getSelf()); + } + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java new file mode 100644 index 00000000..517be316 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java @@ -0,0 +1,27 @@ +package com.github.kfcfans.powerjob.server.akka.requests; + +import com.github.kfcfans.powerjob.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 运行 Job 或 工作流,需要转发到 server 进行,否则没有集群信息 + * + * @author tjq + * @since 11/7/20 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RunJobOrWorkflowReq implements OmsSerializable { + public static final int JOB = 1; + public static final int WORKFLOW = 2; + + private int type; + private long id; + private long delay; + private String params; + + private long appId; +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index ad81184f..cc6ee4c9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -5,11 +5,15 @@ import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; +import com.github.kfcfans.powerjob.server.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq; import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; +import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; @@ -22,6 +26,7 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.Optional; /** @@ -44,6 +49,9 @@ public class JobService { @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private AppInfoRepository appInfoRepository; + /** * 保存/修改任务 * @param request 任务请求 @@ -103,12 +111,30 @@ public class JobService { */ public long runJob(Long jobId, String instanceParams, long delay) { - log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay); - JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); + + AppInfoDO appInfo = appInfoRepository.findById(jobInfo.getAppId()).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + jobInfo.getAppId())); + String targetServer = appInfo.getCurrentServer(); + + if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) { + return realRunJob(jobInfo, instanceParams, delay); + } + + // 转发请求 + log.info("[Job-{}] redirect run request to target server: {}", jobId, targetServer); + RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.JOB, jobId, delay, instanceParams, jobInfo.getAppId()); + try { + return Long.parseLong(OhMyServer.askFriend(targetServer, req)); + }catch (Exception e) { + log.error("[Job-{}] redirect run request[params={}] to target server[{}] failed!", jobId, instanceParams, targetServer); + throw new PowerJobException("redirect run request failed!", e); + } + } + + private long realRunJob(JobInfoDO jobInfo, String instanceParams, long delay) { + log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobInfo.getId(), instanceParams, delay); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); instanceInfoRepository.flush(); - if (delay <= 0) { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }else { @@ -116,7 +142,7 @@ public class JobService { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }); } - log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId); + log.info("[Job-{}] run job successfully, params= {}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); return instanceId; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 073d7491..81430898 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -5,13 +5,18 @@ import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO; +import com.github.kfcfans.powerjob.server.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq; import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -24,9 +29,12 @@ import java.util.Date; * @author tjq * @since 2020/5/26 */ +@Slf4j @Service public class WorkflowService { + @Resource + private AppInfoRepository appInfoRepository; @Resource private WorkflowInstanceManager workflowInstanceManager; @Resource @@ -138,8 +146,28 @@ public class WorkflowService { public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); - Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); + AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + appId)); + + String targetServer = OhMyServer.getActorSystemAddress(); + if (targetServer.equals(appInfo.getCurrentServer())) { + return realRunWorkflow(wfInfo, initParams, delay); + } + + log.info("[WorkflowService-{}] redirect run request to target server: {}", wfId, targetServer); + // 转发请求 + RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.WORKFLOW, wfId, delay, initParams, appId); + try { + return Long.valueOf(OhMyServer.askFriend(targetServer, req)); + }catch (Exception e) { + log.error("[WorkflowService-{}] redirect run request[params={}] to target server[{}] failed!", wfId, initParams, targetServer); + throw new PowerJobException("redirect run request failed!", e); + } + } + + private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) { + log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); if (delay <= 0) { workflowInstanceManager.start(wfInfo, wfInstanceId, initParams); }else { From 7ac70defd304f423eac3629b3208f3b324c89fba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 21:57:16 +0800 Subject: [PATCH 05/10] fix: OpenAPI's saveWorkflow --- .../kfcfans/powerjob/client/OhMyClient.java | 4 ++- powerjob-client/src/test/java/TestClient.java | 33 +++++++++++-------- .../src/test/java/TestWorkflow.java | 1 + .../powerjob/common/RemoteConstant.java | 2 +- .../powerjob/common/model/PEWorkflowDAG.java | 7 ++-- .../request/http/SaveWorkflowRequest.java | 3 +- .../powerjob/common/response/AskResponse.java | 2 +- .../powerjob/common/utils/JsonUtils.java | 8 +++-- .../powerjob/server/akka/OhMyServer.java | 6 ++-- .../powerjob/server/service/JobService.java | 2 +- .../tracker/processor/ProcessorTracker.java | 2 +- .../core/tracker/task/CommonTaskTracker.java | 4 +-- 12 files changed, 45 insertions(+), 29 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 31525b28..04efad72 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import okhttp3.FormBody; @@ -283,7 +284,8 @@ public class OhMyClient { public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws PowerJobException { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); - String json = JSONObject.toJSONString(request); + // 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg + String json = JsonUtils.toJSONStringUnsafe(request); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); return JSONObject.parseObject(post, LONG_RESULT_TYPE); } diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java index dd469c99..f50aed98 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/TestClient.java @@ -21,6 +21,8 @@ public class TestClient { private static OhMyClient ohMyClient; + public static final long JOB_ID = 4L; + @BeforeAll public static void initClient() throws Exception { ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); @@ -30,7 +32,7 @@ public class TestClient { public void testSaveJob() throws Exception { SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest(); -// newJobInfo.setId(8L); + newJobInfo.setId(JOB_ID); newJobInfo.setJobName("omsOpenAPIJobccccc"); newJobInfo.setJobDescription("tes OpenAPI"); newJobInfo.setJobParams("{'aa':'bb'}"); @@ -38,8 +40,8 @@ public class TestClient { newJobInfo.setTimeExpression("0 0 * * * ? "); newJobInfo.setExecuteType(ExecuteType.STANDALONE); newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA); - newJobInfo.setProcessorInfo("com.github.kfcfans.oms.server.tester.OmsLogPerformanceTester"); - newJobInfo.setDesignatedWorkers("192.168.1.1:2777"); + newJobInfo.setProcessorInfo("com.github.kfcfans.powerjob.samples.processors.StandaloneProcessorDemo"); + newJobInfo.setDesignatedWorkers(""); newJobInfo.setMinCpuCores(1.1); newJobInfo.setMinMemorySpace(1.2); @@ -51,48 +53,53 @@ public class TestClient { @Test public void testFetchJob() throws Exception { - ResultDTO fetchJob = ohMyClient.fetchJob(1L); + ResultDTO fetchJob = ohMyClient.fetchJob(JOB_ID); System.out.println(JSONObject.toJSONString(fetchJob)); } @Test public void testDisableJob() throws Exception { - System.out.println(ohMyClient.disableJob(7L)); + System.out.println(ohMyClient.disableJob(JOB_ID)); } @Test public void testEnableJob() throws Exception { - System.out.println(ohMyClient.enableJob(7L)); + System.out.println(ohMyClient.enableJob(JOB_ID)); } @Test public void testDeleteJob() throws Exception { - System.out.println(ohMyClient.deleteJob(7L)); + System.out.println(ohMyClient.deleteJob(JOB_ID)); } @Test - public void testRunJob() throws Exception { - System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000)); + public void testRun() { + System.out.println(ohMyClient.runJob(JOB_ID)); + } + + @Test + public void testRunJobDelay() throws Exception { + System.out.println(ohMyClient.runJob(JOB_ID, "this is instanceParams", 60000)); } @Test public void testFetchInstanceInfo() throws Exception { - System.out.println(ohMyClient.fetchInstanceInfo(141251409466097728L)); + System.out.println(ohMyClient.fetchInstanceInfo(205436386851946560L)); } @Test public void testStopInstance() throws Exception { - ResultDTO res = ohMyClient.stopInstance(141251409466097728L); + ResultDTO res = ohMyClient.stopInstance(205436995885858880L); System.out.println(res.toString()); } @Test public void testFetchInstanceStatus() throws Exception { - System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L)); + System.out.println(ohMyClient.fetchInstanceStatus(205436995885858880L)); } @Test public void testCancelInstanceInTimeWheel() throws Exception { - ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000); + ResultDTO startRes = ohMyClient.runJob(JOB_ID, "start by OhMyClient", 20000); System.out.println("runJob result: " + JSONObject.toJSONString(startRes)); ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData()); System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes)); diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index 9e04d80e..01cd6e8a 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -64,6 +64,7 @@ public class TestWorkflow { req.setEnable(true); req.setTimeExpressionType(TimeExpressionType.API); + System.out.println("req ->" + JSONObject.toJSON(req)); System.out.println(ohMyClient.saveWorkflow(req)); } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index 3fa65ab0..814e104a 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -33,5 +33,5 @@ public class RemoteConstant { /* ************************ OTHERS ************************ */ public static final String EMPTY_ADDRESS = "N/A"; - public static final long DEFAULT_TIMEOUT_MS = 3000; + public static final long DEFAULT_TIMEOUT_MS = 5000; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java index afc967af..3fe258ca 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java @@ -9,6 +9,7 @@ import lombok.NoArgsConstructor; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Serializable; import java.util.List; /** @@ -20,7 +21,7 @@ import java.util.List; */ @Data @NoArgsConstructor -public class PEWorkflowDAG { +public class PEWorkflowDAG implements Serializable { // DAG 图(点线表示法) private List nodes; @@ -30,7 +31,7 @@ public class PEWorkflowDAG { @Data @NoArgsConstructor @AllArgsConstructor - public static class Node { + public static class Node implements Serializable { private Long jobId; private String jobName; @@ -50,7 +51,7 @@ public class PEWorkflowDAG { @Data @NoArgsConstructor @AllArgsConstructor - public static class Edge { + public static class Edge implements Serializable { private Long from; private Long to; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java index 3e0e03e1..3bf4138a 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java @@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.google.common.collect.Lists; import lombok.Data; +import java.io.Serializable; import java.util.List; /** @@ -15,7 +16,7 @@ import java.util.List; * @since 2020/5/26 */ @Data -public class SaveWorkflowRequest { +public class SaveWorkflowRequest implements Serializable { private Long id; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java index b41dba7d..5f6faa89 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java @@ -53,7 +53,7 @@ public class AskResponse implements OmsSerializable { return JsonUtils.parseObject(data, clz); } - public String getDataAsString() { + public String parseDataAsString() { return new String(data, StandardCharsets.UTF_8); } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java index 2e91357f..68cc5123 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java @@ -29,8 +29,12 @@ public class JsonUtils { return null; } - public static String toJSONStringUnsafe(Object obj) throws JsonProcessingException { - return objectMapper.writeValueAsString(obj); + public static String toJSONStringUnsafe(Object obj) { + try { + return objectMapper.writeValueAsString(obj); + }catch (Exception e) { + throw new PowerJobException(e); + } } public static byte[] toBytes(Object obj) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index 513c9bf2..f818675a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -97,8 +97,8 @@ public class OhMyServer { } /** - * ASK 其他 powejob-server,要求 AskResponse 中的 Data 为 String - * @param address 其他 powejob-server 的地址(ip:port) + * ASK 其他 powerjob-server,要求 AskResponse 中的 Data 为 String + * @param address 其他 powerjob-server 的地址(ip:port) * @param request 请求 * @return 返回值 OR 异常 */ @@ -106,7 +106,7 @@ public class OhMyServer { CompletionStage askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); if (askResponse.isSuccess()) { - return askResponse.getDataAsString(); + return askResponse.parseDataAsString(); } throw new PowerJobException("remote server process failed:" + askResponse.getMessage()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index cc6ee4c9..2e1854ab 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -142,7 +142,7 @@ public class JobService { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }); } - log.info("[Job-{}] run job successfully, params= {}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); + log.info("[Job-{}] run job successfully, params={}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); return instanceId; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index c6b536b9..4c7e42e2 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -323,7 +323,7 @@ public class ProcessorTracker { if (processor == null) { log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); - throw new PowerJobException("fetch Processor failed"); + throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 513b8722..046b824a 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -236,8 +236,8 @@ public class CommonTaskTracker extends TaskTracker { try { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); serverAccepted = askResponse.isSuccess(); - }catch (Exception ignore) { - log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result); + }catch (Exception e) { + log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result, e); } // 服务器未接受上报,则等待下次重新上报 From 5cf43ddb2a4d5d557397cc354f52b8bd3da84df6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 22:04:04 +0800 Subject: [PATCH 06/10] feat: workflow instance return init params too #92 --- powerjob-client/src/test/java/TestWorkflow.java | 14 ++++++++------ .../common/response/WorkflowInstanceInfoDTO.java | 2 ++ .../web/response/WorkflowInstanceInfoVO.java | 2 ++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index 01cd6e8a..6feaf266 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -22,6 +22,8 @@ public class TestWorkflow { private static OhMyClient ohMyClient; + private static final long WF_ID = 1; + @BeforeAll public static void initClient() throws Exception { ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); @@ -70,27 +72,27 @@ public class TestWorkflow { @Test public void testDisableWorkflow() throws Exception { - System.out.println(ohMyClient.disableWorkflow(4L)); + System.out.println(ohMyClient.disableWorkflow(WF_ID)); } @Test public void testDeleteWorkflow() throws Exception { - System.out.println(ohMyClient.deleteWorkflow(4L)); + System.out.println(ohMyClient.deleteWorkflow(WF_ID)); } @Test public void testEnableWorkflow() throws Exception { - System.out.println(ohMyClient.enableWorkflow(4L)); + System.out.println(ohMyClient.enableWorkflow(WF_ID)); } @Test public void testFetchWorkflowInfo() throws Exception { - System.out.println(ohMyClient.fetchWorkflow(5L)); + System.out.println(ohMyClient.fetchWorkflow(WF_ID)); } @Test public void testRunWorkflow() throws Exception { - System.out.println(ohMyClient.runWorkflow(5L)); + System.out.println(ohMyClient.runWorkflow(WF_ID)); } @Test @@ -105,6 +107,6 @@ public class TestWorkflow { @Test public void testRunWorkflowPlus() throws Exception { - System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000)); + System.out.println(ohMyClient.runWorkflow(WF_ID, "this is init Params 2", 90000)); } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java index 610ce15b..c9a22df4 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java @@ -21,6 +21,8 @@ public class WorkflowInstanceInfoDTO { // workflow 状态(WorkflowInstanceStatus) private Integer status; + // 工作流启动参数 + private String wfInitParams; private String dag; private String result; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java index 5d05e1c2..20c55b06 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java @@ -26,6 +26,8 @@ public class WorkflowInstanceInfoVO { // workflow 状态(WorkflowInstanceStatus) private Integer status; + // 工作流启动参数 + private String wfInitParams; private PEWorkflowDAG pEWorkflowDAG; private String result; From d8062f810c4ce3f3c2d2aa8fa3d766c945eaa386 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 22:09:38 +0800 Subject: [PATCH 07/10] fix: NPE when user use invalid params #87 --- .../powerjob/server/service/JobService.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 2e1854ab..8f7738c7 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -78,9 +78,8 @@ public class JobService { jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); - if (jobInfoDO.getMaxWorkerCount() == null) { - jobInfoDO.setMaxWorkerCount(0); - } + // 填充默认值,非空保护防止 NPE + fillDefaultValue(jobInfoDO); // 转化报警用户列表 if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { @@ -230,4 +229,13 @@ public class JobService { jobInfoDO.setGmtModified(now); } + private void fillDefaultValue(JobInfoDO jobInfoDO) { + if (jobInfoDO.getMaxWorkerCount() == null) { + jobInfoDO.setMaxWorkerCount(0); + } + if (jobInfoDO.getMaxInstanceNum() == null) { + jobInfoDO.setMaxInstanceNum(0); + } + } + } From 92c0f7b7bb12f4cc006d40a5f005c1c397fc02fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 22:15:58 +0800 Subject: [PATCH 08/10] fix: NPE when user use invalid params #87 --- .../kfcfans/powerjob/server/service/JobService.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 8f7738c7..7c278a81 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -236,6 +236,15 @@ public class JobService { if (jobInfoDO.getMaxInstanceNum() == null) { jobInfoDO.setMaxInstanceNum(0); } + if (jobInfoDO.getConcurrency() == null) { + jobInfoDO.setConcurrency(0); + } + if (jobInfoDO.getInstanceRetryNum() == null) { + jobInfoDO.setInstanceRetryNum(0); + } + if (jobInfoDO.getTaskRetryNum() == null) { + jobInfoDO.setTaskRetryNum(0); + } } } From 8bed28d0c59d0a5d9014606e4f38a7c2abcdeff4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 22:44:37 +0800 Subject: [PATCH 09/10] fix: workflow redirect run request bug --- .../github/kfcfans/powerjob/server/service/JobService.java | 4 ++-- .../powerjob/server/service/workflow/WorkflowService.java | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 7c278a81..fc345083 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -120,7 +120,7 @@ public class JobService { } // 转发请求 - log.info("[Job-{}] redirect run request to target server: {}", jobId, targetServer); + log.info("[Job-{}] redirect run request[params={}] to target server: {}", jobId, instanceParams, targetServer); RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.JOB, jobId, delay, instanceParams, jobInfo.getAppId()); try { return Long.parseLong(OhMyServer.askFriend(targetServer, req)); @@ -237,7 +237,7 @@ public class JobService { jobInfoDO.setMaxInstanceNum(0); } if (jobInfoDO.getConcurrency() == null) { - jobInfoDO.setConcurrency(0); + jobInfoDO.setConcurrency(5); } if (jobInfoDO.getInstanceRetryNum() == null) { jobInfoDO.setInstanceRetryNum(0); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 81430898..51929452 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; +import java.util.Objects; /** * Workflow 服务 @@ -149,12 +150,12 @@ public class WorkflowService { AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + appId)); - String targetServer = OhMyServer.getActorSystemAddress(); - if (targetServer.equals(appInfo.getCurrentServer())) { + String targetServer = appInfo.getCurrentServer(); + if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) { return realRunWorkflow(wfInfo, initParams, delay); } - log.info("[WorkflowService-{}] redirect run request to target server: {}", wfId, targetServer); + log.info("[WorkflowService-{}] redirect run request[initParams={}] to target server: {}", wfId, initParams, targetServer); // 转发请求 RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.WORKFLOW, wfId, delay, initParams, appId); try { From c0c7942fc6c62df6eccd499c96bb2947d248371f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 22:46:30 +0800 Subject: [PATCH 10/10] feat: change version to 3.3.2 --- powerjob-client/pom.xml | 4 ++-- powerjob-common/pom.xml | 2 +- powerjob-server/pom.xml | 4 ++-- powerjob-worker-agent/pom.xml | 4 ++-- powerjob-worker-samples/pom.xml | 4 ++-- powerjob-worker-spring-boot-starter/pom.xml | 4 ++-- powerjob-worker/pom.xml | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 987c1d41..c17c1dee 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-client - 3.3.1 + 3.3.2 jar 5.6.1 1.2.68 - 3.3.1 + 3.3.2 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 3bca0b62..6c9b9ec8 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.3.1 + 3.3.2 jar diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index a5e18322..e6e148e4 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.3.1 + 3.3.2 jar 2.9.2 2.3.4.RELEASE - 3.3.1 + 3.3.2 8.0.19 19.7.0.0 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index e51219e0..e406c41a 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.3.1 + 3.3.2 jar - 3.3.1 + 3.3.2 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index c34da9af..bda8400c 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.3.1 + 3.3.2 2.2.6.RELEASE - 3.3.1 + 3.3.2 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 6ddad2fa..882caa38 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.3.1 + 3.3.2 jar - 3.3.1 + 3.3.2 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index f00034e0..76c678c6 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.3.1 + 3.3.2 jar 5.2.4.RELEASE - 3.3.1 + 3.3.2 1.4.200 3.4.2 5.6.1