From 7ac70defd304f423eac3629b3208f3b324c89fba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sat, 7 Nov 2020 21:57:16 +0800 Subject: [PATCH] fix: OpenAPI's saveWorkflow --- .../kfcfans/powerjob/client/OhMyClient.java | 4 ++- powerjob-client/src/test/java/TestClient.java | 33 +++++++++++-------- .../src/test/java/TestWorkflow.java | 1 + .../powerjob/common/RemoteConstant.java | 2 +- .../powerjob/common/model/PEWorkflowDAG.java | 7 ++-- .../request/http/SaveWorkflowRequest.java | 3 +- .../powerjob/common/response/AskResponse.java | 2 +- .../powerjob/common/utils/JsonUtils.java | 8 +++-- .../powerjob/server/akka/OhMyServer.java | 6 ++-- .../powerjob/server/service/JobService.java | 2 +- .../tracker/processor/ProcessorTracker.java | 2 +- .../core/tracker/task/CommonTaskTracker.java | 4 +-- 12 files changed, 45 insertions(+), 29 deletions(-) diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 31525b28..04efad72 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import okhttp3.FormBody; @@ -283,7 +284,8 @@ public class OhMyClient { public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws PowerJobException { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); - String json = JSONObject.toJSONString(request); + // 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg + String json = JsonUtils.toJSONStringUnsafe(request); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); return JSONObject.parseObject(post, LONG_RESULT_TYPE); } diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java index dd469c99..f50aed98 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/TestClient.java @@ -21,6 +21,8 @@ public class TestClient { private static OhMyClient ohMyClient; + public static final long JOB_ID = 4L; + @BeforeAll public static void initClient() throws Exception { ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); @@ -30,7 +32,7 @@ public class TestClient { public void testSaveJob() throws Exception { SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest(); -// newJobInfo.setId(8L); + newJobInfo.setId(JOB_ID); newJobInfo.setJobName("omsOpenAPIJobccccc"); newJobInfo.setJobDescription("tes OpenAPI"); newJobInfo.setJobParams("{'aa':'bb'}"); @@ -38,8 +40,8 @@ public class TestClient { newJobInfo.setTimeExpression("0 0 * * * ? "); newJobInfo.setExecuteType(ExecuteType.STANDALONE); newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA); - newJobInfo.setProcessorInfo("com.github.kfcfans.oms.server.tester.OmsLogPerformanceTester"); - newJobInfo.setDesignatedWorkers("192.168.1.1:2777"); + newJobInfo.setProcessorInfo("com.github.kfcfans.powerjob.samples.processors.StandaloneProcessorDemo"); + newJobInfo.setDesignatedWorkers(""); newJobInfo.setMinCpuCores(1.1); newJobInfo.setMinMemorySpace(1.2); @@ -51,48 +53,53 @@ public class TestClient { @Test public void testFetchJob() throws Exception { - ResultDTO fetchJob = ohMyClient.fetchJob(1L); + ResultDTO fetchJob = ohMyClient.fetchJob(JOB_ID); System.out.println(JSONObject.toJSONString(fetchJob)); } @Test public void testDisableJob() throws Exception { - System.out.println(ohMyClient.disableJob(7L)); + System.out.println(ohMyClient.disableJob(JOB_ID)); } @Test public void testEnableJob() throws Exception { - System.out.println(ohMyClient.enableJob(7L)); + System.out.println(ohMyClient.enableJob(JOB_ID)); } @Test public void testDeleteJob() throws Exception { - System.out.println(ohMyClient.deleteJob(7L)); + System.out.println(ohMyClient.deleteJob(JOB_ID)); } @Test - public void testRunJob() throws Exception { - System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000)); + public void testRun() { + System.out.println(ohMyClient.runJob(JOB_ID)); + } + + @Test + public void testRunJobDelay() throws Exception { + System.out.println(ohMyClient.runJob(JOB_ID, "this is instanceParams", 60000)); } @Test public void testFetchInstanceInfo() throws Exception { - System.out.println(ohMyClient.fetchInstanceInfo(141251409466097728L)); + System.out.println(ohMyClient.fetchInstanceInfo(205436386851946560L)); } @Test public void testStopInstance() throws Exception { - ResultDTO res = ohMyClient.stopInstance(141251409466097728L); + ResultDTO res = ohMyClient.stopInstance(205436995885858880L); System.out.println(res.toString()); } @Test public void testFetchInstanceStatus() throws Exception { - System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L)); + System.out.println(ohMyClient.fetchInstanceStatus(205436995885858880L)); } @Test public void testCancelInstanceInTimeWheel() throws Exception { - ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000); + ResultDTO startRes = ohMyClient.runJob(JOB_ID, "start by OhMyClient", 20000); System.out.println("runJob result: " + JSONObject.toJSONString(startRes)); ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData()); System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes)); diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index 9e04d80e..01cd6e8a 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -64,6 +64,7 @@ public class TestWorkflow { req.setEnable(true); req.setTimeExpressionType(TimeExpressionType.API); + System.out.println("req ->" + JSONObject.toJSON(req)); System.out.println(ohMyClient.saveWorkflow(req)); } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index 3fa65ab0..814e104a 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -33,5 +33,5 @@ public class RemoteConstant { /* ************************ OTHERS ************************ */ public static final String EMPTY_ADDRESS = "N/A"; - public static final long DEFAULT_TIMEOUT_MS = 3000; + public static final long DEFAULT_TIMEOUT_MS = 5000; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java index afc967af..3fe258ca 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java @@ -9,6 +9,7 @@ import lombok.NoArgsConstructor; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Serializable; import java.util.List; /** @@ -20,7 +21,7 @@ import java.util.List; */ @Data @NoArgsConstructor -public class PEWorkflowDAG { +public class PEWorkflowDAG implements Serializable { // DAG 图(点线表示法) private List nodes; @@ -30,7 +31,7 @@ public class PEWorkflowDAG { @Data @NoArgsConstructor @AllArgsConstructor - public static class Node { + public static class Node implements Serializable { private Long jobId; private String jobName; @@ -50,7 +51,7 @@ public class PEWorkflowDAG { @Data @NoArgsConstructor @AllArgsConstructor - public static class Edge { + public static class Edge implements Serializable { private Long from; private Long to; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java index 3e0e03e1..3bf4138a 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java @@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.google.common.collect.Lists; import lombok.Data; +import java.io.Serializable; import java.util.List; /** @@ -15,7 +16,7 @@ import java.util.List; * @since 2020/5/26 */ @Data -public class SaveWorkflowRequest { +public class SaveWorkflowRequest implements Serializable { private Long id; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java index b41dba7d..5f6faa89 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java @@ -53,7 +53,7 @@ public class AskResponse implements OmsSerializable { return JsonUtils.parseObject(data, clz); } - public String getDataAsString() { + public String parseDataAsString() { return new String(data, StandardCharsets.UTF_8); } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java index 2e91357f..68cc5123 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java @@ -29,8 +29,12 @@ public class JsonUtils { return null; } - public static String toJSONStringUnsafe(Object obj) throws JsonProcessingException { - return objectMapper.writeValueAsString(obj); + public static String toJSONStringUnsafe(Object obj) { + try { + return objectMapper.writeValueAsString(obj); + }catch (Exception e) { + throw new PowerJobException(e); + } } public static byte[] toBytes(Object obj) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index 513c9bf2..f818675a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -97,8 +97,8 @@ public class OhMyServer { } /** - * ASK 其他 powejob-server,要求 AskResponse 中的 Data 为 String - * @param address 其他 powejob-server 的地址(ip:port) + * ASK 其他 powerjob-server,要求 AskResponse 中的 Data 为 String + * @param address 其他 powerjob-server 的地址(ip:port) * @param request 请求 * @return 返回值 OR 异常 */ @@ -106,7 +106,7 @@ public class OhMyServer { CompletionStage askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); if (askResponse.isSuccess()) { - return askResponse.getDataAsString(); + return askResponse.parseDataAsString(); } throw new PowerJobException("remote server process failed:" + askResponse.getMessage()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index cc6ee4c9..2e1854ab 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -142,7 +142,7 @@ public class JobService { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }); } - log.info("[Job-{}] run job successfully, params= {}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); + log.info("[Job-{}] run job successfully, params={}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); return instanceId; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index c6b536b9..4c7e42e2 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -323,7 +323,7 @@ public class ProcessorTracker { if (processor == null) { log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); - throw new PowerJobException("fetch Processor failed"); + throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 513b8722..046b824a 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -236,8 +236,8 @@ public class CommonTaskTracker extends TaskTracker { try { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); serverAccepted = askResponse.isSuccess(); - }catch (Exception ignore) { - log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result); + }catch (Exception e) { + log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result, e); } // 服务器未接受上报,则等待下次重新上报