fix: OpenAPI's saveWorkflow

This commit is contained in:
“tjq” 2020-11-07 21:57:16 +08:00
parent 0cc7cc26b4
commit 7ac70defd3
12 changed files with 45 additions and 29 deletions

View File

@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody; import okhttp3.FormBody;
@ -283,7 +284,8 @@ public class OhMyClient {
public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) throws PowerJobException { public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) throws PowerJobException {
request.setAppId(appId); request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JSONObject.toJSONString(request); // 中坑记录 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG null无语.jpg
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
return JSONObject.parseObject(post, LONG_RESULT_TYPE); return JSONObject.parseObject(post, LONG_RESULT_TYPE);
} }

View File

@ -21,6 +21,8 @@ public class TestClient {
private static OhMyClient ohMyClient; private static OhMyClient ohMyClient;
public static final long JOB_ID = 4L;
@BeforeAll @BeforeAll
public static void initClient() throws Exception { public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
@ -30,7 +32,7 @@ public class TestClient {
public void testSaveJob() throws Exception { public void testSaveJob() throws Exception {
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest(); SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
// newJobInfo.setId(8L); newJobInfo.setId(JOB_ID);
newJobInfo.setJobName("omsOpenAPIJobccccc"); newJobInfo.setJobName("omsOpenAPIJobccccc");
newJobInfo.setJobDescription("tes OpenAPI"); newJobInfo.setJobDescription("tes OpenAPI");
newJobInfo.setJobParams("{'aa':'bb'}"); newJobInfo.setJobParams("{'aa':'bb'}");
@ -38,8 +40,8 @@ public class TestClient {
newJobInfo.setTimeExpression("0 0 * * * ? "); newJobInfo.setTimeExpression("0 0 * * * ? ");
newJobInfo.setExecuteType(ExecuteType.STANDALONE); newJobInfo.setExecuteType(ExecuteType.STANDALONE);
newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA); newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA);
newJobInfo.setProcessorInfo("com.github.kfcfans.oms.server.tester.OmsLogPerformanceTester"); newJobInfo.setProcessorInfo("com.github.kfcfans.powerjob.samples.processors.StandaloneProcessorDemo");
newJobInfo.setDesignatedWorkers("192.168.1.1:2777"); newJobInfo.setDesignatedWorkers("");
newJobInfo.setMinCpuCores(1.1); newJobInfo.setMinCpuCores(1.1);
newJobInfo.setMinMemorySpace(1.2); newJobInfo.setMinMemorySpace(1.2);
@ -51,48 +53,53 @@ public class TestClient {
@Test @Test
public void testFetchJob() throws Exception { public void testFetchJob() throws Exception {
ResultDTO<JobInfoDTO> fetchJob = ohMyClient.fetchJob(1L); ResultDTO<JobInfoDTO> fetchJob = ohMyClient.fetchJob(JOB_ID);
System.out.println(JSONObject.toJSONString(fetchJob)); System.out.println(JSONObject.toJSONString(fetchJob));
} }
@Test @Test
public void testDisableJob() throws Exception { public void testDisableJob() throws Exception {
System.out.println(ohMyClient.disableJob(7L)); System.out.println(ohMyClient.disableJob(JOB_ID));
} }
@Test @Test
public void testEnableJob() throws Exception { public void testEnableJob() throws Exception {
System.out.println(ohMyClient.enableJob(7L)); System.out.println(ohMyClient.enableJob(JOB_ID));
} }
@Test @Test
public void testDeleteJob() throws Exception { public void testDeleteJob() throws Exception {
System.out.println(ohMyClient.deleteJob(7L)); System.out.println(ohMyClient.deleteJob(JOB_ID));
} }
@Test @Test
public void testRunJob() throws Exception { public void testRun() {
System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000)); System.out.println(ohMyClient.runJob(JOB_ID));
}
@Test
public void testRunJobDelay() throws Exception {
System.out.println(ohMyClient.runJob(JOB_ID, "this is instanceParams", 60000));
} }
@Test @Test
public void testFetchInstanceInfo() throws Exception { public void testFetchInstanceInfo() throws Exception {
System.out.println(ohMyClient.fetchInstanceInfo(141251409466097728L)); System.out.println(ohMyClient.fetchInstanceInfo(205436386851946560L));
} }
@Test @Test
public void testStopInstance() throws Exception { public void testStopInstance() throws Exception {
ResultDTO<Void> res = ohMyClient.stopInstance(141251409466097728L); ResultDTO<Void> res = ohMyClient.stopInstance(205436995885858880L);
System.out.println(res.toString()); System.out.println(res.toString());
} }
@Test @Test
public void testFetchInstanceStatus() throws Exception { public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L)); System.out.println(ohMyClient.fetchInstanceStatus(205436995885858880L));
} }
@Test @Test
public void testCancelInstanceInTimeWheel() throws Exception { public void testCancelInstanceInTimeWheel() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000); ResultDTO<Long> startRes = ohMyClient.runJob(JOB_ID, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JSONObject.toJSONString(startRes)); System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData()); ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes)); System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));

View File

@ -64,6 +64,7 @@ public class TestWorkflow {
req.setEnable(true); req.setEnable(true);
req.setTimeExpressionType(TimeExpressionType.API); req.setTimeExpressionType(TimeExpressionType.API);
System.out.println("req ->" + JSONObject.toJSON(req));
System.out.println(ohMyClient.saveWorkflow(req)); System.out.println(ohMyClient.saveWorkflow(req));
} }

View File

@ -33,5 +33,5 @@ public class RemoteConstant {
/* ************************ OTHERS ************************ */ /* ************************ OTHERS ************************ */
public static final String EMPTY_ADDRESS = "N/A"; public static final String EMPTY_ADDRESS = "N/A";
public static final long DEFAULT_TIMEOUT_MS = 3000; public static final long DEFAULT_TIMEOUT_MS = 5000;
} }

View File

@ -9,6 +9,7 @@ import lombok.NoArgsConstructor;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
@ -20,7 +21,7 @@ import java.util.List;
*/ */
@Data @Data
@NoArgsConstructor @NoArgsConstructor
public class PEWorkflowDAG { public class PEWorkflowDAG implements Serializable {
// DAG 点线表示法 // DAG 点线表示法
private List<Node> nodes; private List<Node> nodes;
@ -30,7 +31,7 @@ public class PEWorkflowDAG {
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public static class Node { public static class Node implements Serializable {
private Long jobId; private Long jobId;
private String jobName; private String jobName;
@ -50,7 +51,7 @@ public class PEWorkflowDAG {
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public static class Edge { public static class Edge implements Serializable {
private Long from; private Long from;
private Long to; private Long to;
} }

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
@ -15,7 +16,7 @@ import java.util.List;
* @since 2020/5/26 * @since 2020/5/26
*/ */
@Data @Data
public class SaveWorkflowRequest { public class SaveWorkflowRequest implements Serializable {
private Long id; private Long id;

View File

@ -53,7 +53,7 @@ public class AskResponse implements OmsSerializable {
return JsonUtils.parseObject(data, clz); return JsonUtils.parseObject(data, clz);
} }
public String getDataAsString() { public String parseDataAsString() {
return new String(data, StandardCharsets.UTF_8); return new String(data, StandardCharsets.UTF_8);
} }

View File

@ -29,8 +29,12 @@ public class JsonUtils {
return null; return null;
} }
public static String toJSONStringUnsafe(Object obj) throws JsonProcessingException { public static String toJSONStringUnsafe(Object obj) {
return objectMapper.writeValueAsString(obj); try {
return objectMapper.writeValueAsString(obj);
}catch (Exception e) {
throw new PowerJobException(e);
}
} }
public static byte[] toBytes(Object obj) { public static byte[] toBytes(Object obj) {

View File

@ -97,8 +97,8 @@ public class OhMyServer {
} }
/** /**
* ASK 其他 powejob-server要求 AskResponse 中的 Data String * ASK 其他 powerjob-server要求 AskResponse 中的 Data String
* @param address 其他 powejob-server 的地址ip:port * @param address 其他 powerjob-server 的地址ip:port
* @param request 请求 * @param request 请求
* @return 返回值 OR 异常 * @return 返回值 OR 异常
*/ */
@ -106,7 +106,7 @@ public class OhMyServer {
CompletionStage<Object> askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); CompletionStage<Object> askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get();
if (askResponse.isSuccess()) { if (askResponse.isSuccess()) {
return askResponse.getDataAsString(); return askResponse.parseDataAsString();
} }
throw new PowerJobException("remote server process failed:" + askResponse.getMessage()); throw new PowerJobException("remote server process failed:" + askResponse.getMessage());
} }

View File

@ -142,7 +142,7 @@ public class JobService {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}); });
} }
log.info("[Job-{}] run job successfully, params= {}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); log.info("[Job-{}] run job successfully, params={}, instanceId={}", jobInfo.getId(), instanceParams, instanceId);
return instanceId; return instanceId;
} }

View File

@ -323,7 +323,7 @@ public class ProcessorTracker {
if (processor == null) { if (processor == null) {
log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
throw new PowerJobException("fetch Processor failed"); throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
} }
} }

View File

@ -236,8 +236,8 @@ public class CommonTaskTracker extends TaskTracker {
try { try {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverAccepted = askResponse.isSuccess(); serverAccepted = askResponse.isSuccess();
}catch (Exception ignore) { }catch (Exception e) {
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result); log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result, e);
} }
// 服务器未接受上报则等待下次重新上报 // 服务器未接受上报则等待下次重新上报