diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 7221c891..540de951 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -1,7 +1,10 @@ package com.github.kfcfans.powerjob.client; -import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.common.*; +import com.alibaba.fastjson.JSON; +import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.OmsConstant; +import com.github.kfcfans.powerjob.common.OpenAPIConstant; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.request.http.*; import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery; import com.github.kfcfans.powerjob.common.response.*; @@ -66,7 +69,7 @@ public class OhMyClient { try { String result = assertApp(appName, password, url); if (StringUtils.isNotEmpty(result)) { - ResultDTO resultDTO = JSONObject.parseObject(result, LONG_RESULT_TYPE); + ResultDTO resultDTO = JSON.parseObject(result, LONG_RESULT_TYPE); if (resultDTO.isSuccess()) { appId = resultDTO.getData(); currentAddress = addr; @@ -76,6 +79,7 @@ public class OhMyClient { } } } catch (IOException ignore) { + // } } @@ -112,9 +116,9 @@ public class OhMyClient { request.setAppId(appId); MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); - String json = JSONObject.toJSONString(request); + String json = JSON.toJSONString(request); String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, LONG_RESULT_TYPE); + return JSON.parseObject(post, LONG_RESULT_TYPE); } /** @@ -129,7 +133,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_JOB, body); - return JSONObject.parseObject(post, JOB_RESULT_TYPE); + return JSON.parseObject(post, JOB_RESULT_TYPE); } /** @@ -142,7 +146,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_ALL_JOB, body); - return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE); + return JSON.parseObject(post, LIST_JOB_RESULT_TYPE); } /** @@ -156,7 +160,7 @@ public class OhMyClient { MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); String json = JsonUtils.toJSONStringUnsafe(powerQuery); String post = postHA(OpenAPIConstant.QUERY_JOB, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE); + return JSON.parseObject(post, LIST_JOB_RESULT_TYPE); } /** @@ -171,7 +175,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DISABLE_JOB, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -186,7 +190,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.ENABLE_JOB, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -201,7 +205,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DELETE_JOB, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -222,7 +226,7 @@ public class OhMyClient { builder.add("instanceParams", instanceParams); } String post = postHA(OpenAPIConstant.RUN_JOB, builder.build()); - return JSONObject.parseObject(post, LONG_RESULT_TYPE); + return JSON.parseObject(post, LONG_RESULT_TYPE); } public ResultDTO runJob(Long jobId) { @@ -243,7 +247,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.STOP_INSTANCE, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -259,7 +263,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -275,7 +279,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -289,7 +293,7 @@ public class OhMyClient { .add("instanceId", instanceId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body); - return JSONObject.parseObject(post, INTEGER_RESULT_TYPE); + return JSON.parseObject(post, INTEGER_RESULT_TYPE); } /** @@ -303,7 +307,7 @@ public class OhMyClient { .add("instanceId", instanceId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body); - return JSONObject.parseObject(post, INSTANCE_RESULT_TYPE); + return JSON.parseObject(post, INSTANCE_RESULT_TYPE); } /* ************* Workflow API list ************* */ @@ -321,7 +325,7 @@ public class OhMyClient { // 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg String json = JsonUtils.toJSONStringUnsafe(request); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, LONG_RESULT_TYPE); + return JSON.parseObject(post, LONG_RESULT_TYPE); } /** @@ -335,7 +339,7 @@ public class OhMyClient { MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); String json = JsonUtils.toJSONStringUnsafe(request); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_DAG, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -351,7 +355,7 @@ public class OhMyClient { MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); String json = JsonUtils.toJSONStringUnsafe(requestList); String post = postHA(OpenAPIConstant.ADD_WORKFLOW_NODE, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, WF_NODE_LIST_RESULT_TYPE); + return JSON.parseObject(post, WF_NODE_LIST_RESULT_TYPE); } /** @@ -365,7 +369,7 @@ public class OhMyClient { MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); String json = JsonUtils.toJSONStringUnsafe(request); String post = postHA(OpenAPIConstant.MODIFY_WORKFLOW_NODE, RequestBody.create(jsonType, json)); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } @@ -381,7 +385,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body); - return JSONObject.parseObject(post, WF_RESULT_TYPE); + return JSON.parseObject(post, WF_RESULT_TYPE); } /** @@ -396,7 +400,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -411,7 +415,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -426,7 +430,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -446,7 +450,7 @@ public class OhMyClient { builder.add("initParams", initParams); } String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); - return JSONObject.parseObject(post, LONG_RESULT_TYPE); + return JSON.parseObject(post, LONG_RESULT_TYPE); } public ResultDTO runWorkflow(Long workflowId) { @@ -467,7 +471,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -482,7 +486,24 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.RETRY_WORKFLOW_INSTANCE, body); - return JSONObject.parseObject(post, VOID_RESULT_TYPE); + return JSON.parseObject(post, VOID_RESULT_TYPE); + } + + /** + * mark the workflow node as success + * + * @param wfInstanceId workflow instanceId + * @param nodeId node id + * @return Standard return object + */ + public ResultDTO markWorkflowNodeAsSuccess(Long wfInstanceId,Long nodeId) { + RequestBody body = new FormBody.Builder() + .add("wfInstanceId", wfInstanceId.toString()) + .add("nodeId", nodeId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.MARK_WORKFLOW_NODE_AS_SUCCESS, body); + return JSON.parseObject(post, VOID_RESULT_TYPE); } /** @@ -497,7 +518,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body); - return JSONObject.parseObject(post, WF_INSTANCE_RESULT_TYPE); + return JSON.parseObject(post, WF_INSTANCE_RESULT_TYPE); } diff --git a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java index 5dbd9377..76e12ec7 100644 --- a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java @@ -23,7 +23,7 @@ class TestWorkflow extends ClientInitializer { private static final long WF_ID = 1; @Test - public void initTestData() throws Exception { + void initTestData() throws Exception { SaveJobInfoRequest base = new SaveJobInfoRequest(); base.setJobName("DAG-Node-"); base.setTimeExpressionType(TimeExpressionType.WORKFLOW); @@ -39,7 +39,7 @@ class TestWorkflow extends ClientInitializer { } @Test - public void testSaveWorkflow() throws Exception { + void testSaveWorkflow() throws Exception { SaveWorkflowRequest req = new SaveWorkflowRequest(); @@ -55,7 +55,7 @@ class TestWorkflow extends ClientInitializer { } @Test - public void testAddWorkflowNode(){ + void testAddWorkflowNode() { AddWorkflowNodeRequest addWorkflowNodeRequest = new AddWorkflowNodeRequest(); addWorkflowNodeRequest.setJobId(1L); addWorkflowNodeRequest.setWorkflowId(WF_ID); @@ -63,7 +63,7 @@ class TestWorkflow extends ClientInitializer { } @Test - public void testModifyWorkflowNode() { + void testModifyWorkflowNode() { ModifyWorkflowNodeRequest modifyWorkflowNodeRequest = new ModifyWorkflowNodeRequest(); modifyWorkflowNodeRequest.setWorkflowId(WF_ID); modifyWorkflowNodeRequest.setId(1L); @@ -74,7 +74,7 @@ class TestWorkflow extends ClientInitializer { } @Test - public void testSaveWorkflowDag() { + void testSaveWorkflowDag() { // DAG 图 List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); @@ -96,50 +96,53 @@ class TestWorkflow extends ClientInitializer { } - - @Test - public void testDisableWorkflow() throws Exception { + void testDisableWorkflow() throws Exception { System.out.println(ohMyClient.disableWorkflow(WF_ID)); } @Test - public void testDeleteWorkflow() throws Exception { + void testDeleteWorkflow() throws Exception { System.out.println(ohMyClient.deleteWorkflow(WF_ID)); } @Test - public void testEnableWorkflow() throws Exception { + void testEnableWorkflow() throws Exception { System.out.println(ohMyClient.enableWorkflow(WF_ID)); } @Test - public void testFetchWorkflowInfo() throws Exception { + void testFetchWorkflowInfo() throws Exception { System.out.println(ohMyClient.fetchWorkflow(WF_ID)); } @Test - public void testRunWorkflow() throws Exception { + void testRunWorkflow() throws Exception { System.out.println(ohMyClient.runWorkflow(WF_ID)); } @Test - public void testStopWorkflowInstance() throws Exception { + void testStopWorkflowInstance() throws Exception { System.out.println(ohMyClient.stopWorkflowInstance(149962433421639744L)); } @Test - public void testRetryWorkflowInstance() { + void testRetryWorkflowInstance() { System.out.println(ohMyClient.retryWorkflowInstance(149962433421639744L)); } @Test - public void testFetchWfInstanceInfo() throws Exception { + void testMarkWorkflowNodeAsSuccess() { + System.out.println(ohMyClient.markWorkflowNodeAsSuccess(149962433421639744L, 1L)); + } + + @Test + void testFetchWfInstanceInfo() throws Exception { System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L)); } @Test - public void testRunWorkflowPlus() throws Exception { + void testRunWorkflowPlus() throws Exception { System.out.println(ohMyClient.runWorkflow(WF_ID, "this is init Params 2", 90000)); } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java index 504f8552..4ba58842 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java @@ -53,4 +53,5 @@ public class OpenAPIConstant { public static final String STOP_WORKFLOW_INSTANCE = "/stopWfInstance"; public static final String RETRY_WORKFLOW_INSTANCE = "/retryWfInstance"; public static final String FETCH_WORKFLOW_INSTANCE_INFO = "/fetchWfInstanceInfo"; + public static final String MARK_WORKFLOW_NODE_AS_SUCCESS = "/markWorkflowNodeAsSuccess"; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java index 7300cea0..99254a52 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java @@ -61,4 +61,8 @@ public class SystemInstanceResult { * 被禁用的节点 */ public static final String DISABLE_NODE = "disable node"; + /** + * 标记为成功的节点 + */ + public static final String MARK_AS_SUCCESSFUL_NODE = "mark as successful node"; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java index 7bba8d25..ca2cd6d9 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java @@ -15,20 +15,27 @@ import java.util.List; @Getter @AllArgsConstructor public enum WorkflowInstanceStatus { - + /** + * 初始状态为等待调度 + */ WAITING(1, "等待调度"), RUNNING(2, "运行中"), FAILED(3, "失败"), SUCCEED(4, "成功"), STOPPED(10, "手动停止"); - // 广义的运行状态 - public static final List generalizedRunningStatus = Lists.newArrayList(WAITING.v, RUNNING.v); - // 结束状态 - public static final List finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); + /** + * 广义的运行状态 + */ + public static final List GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v); + /** + * 结束状态 + */ + public static final List FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); - private int v; - private String des; + private final int v; + + private final String des; public static WorkflowInstanceStatus of(int v) { for (WorkflowInstanceStatus is : values()) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index fd2741ab..ee8b3a00 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -168,7 +168,7 @@ public class CleanService { } try { Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay); - int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.finishedStatus); + int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.FINISHED_STATUS); log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t); }catch (Exception e) { log.warn("[CleanService] clean workflow instanceInfo failed.", e); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index 8d4a5fd0..f6917b88 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -163,7 +163,7 @@ public class WorkflowInstanceManager { } // 并发度控制 - int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.generalizedRunningStatus); + int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS); if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo); return; @@ -237,7 +237,7 @@ public class WorkflowInstanceManager { Long wfId = wfInstance.getWorkflowId(); // 特殊处理手动终止 且 工作流实例已经不在运行状态的情况 - if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { // 由用户手动停止工作流实例导致,不需要任何操作 return; } @@ -267,7 +267,7 @@ public class WorkflowInstanceManager { wfInstance.setGmtModified(new Date()); wfInstance.setDag(JSON.toJSONString(dag)); // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图 - if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { workflowInstanceInfoRepository.saveAndFlush(wfInstance); log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus()); return; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index d3e77387..6a4ad32a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -54,7 +54,7 @@ public class WorkflowInstanceService { */ public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); - if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("workflow instance already stopped"); } // 停止所有已启动且未完成的服务 @@ -91,7 +91,7 @@ public class WorkflowInstanceService { public void retryWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); // 仅允许重试 失败的工作流 - if (WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("workflow instance is running"); } if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) { @@ -145,4 +145,53 @@ public class WorkflowInstanceService { return wfInstance; } + /** + * Add by Echo009 on 2021/02/20 + * 将节点标记成功 + * 注意:这里仅能标记真正执行失败的且不允许跳过的节点 + * 即处于 [失败且不允许跳过] 的节点 + * 而且仅会操作工作流实例 DAG 中的节点信息(状态、result) + * 并不会改变对应任务实例中的任何信息 + * + * @param wfInstanceId 工作流实例 ID + * @param nodeId 节点 ID + */ + public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) { + + WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); + // 校验工作流实例状态,运行中的不允许处理, + if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { + throw new PowerJobException("you can't mark the node in a running workflow!"); + } + // 这里一定能反序列化成功 + PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + PEWorkflowDAG.Node targetNode = null; + for (PEWorkflowDAG.Node node : dag.getNodes()) { + if (node.getNodeId().equals(nodeId)) { + targetNode = node; + break; + } + } + if (targetNode == null) { + throw new PowerJobException("can't find the node in current DAG!"); + } + boolean allowSkipWhenFailed = targetNode.getSkipWhenFailed() != null && targetNode.getSkipWhenFailed(); + // 仅允许处理 执行失败的且不允许失败跳过的节点 + if (targetNode.getInstanceId() != null + && targetNode.getStatus() == InstanceStatus.FAILED.getV() + // 不允许失败跳过 + && !allowSkipWhenFailed) { + // 仅处理工作流实例中的节点信息 + targetNode.setStatus(InstanceStatus.SUCCEED.getV()) + .setResult(SystemInstanceResult.MARK_AS_SUCCESSFUL_NODE); + + wfInstance.setDag(JSON.toJSONString(dag)); + wfInstanceInfoRepository.saveAndFlush(wfInstance); + return; + } + // 其他情况均拒绝处理 + throw new PowerJobException("you can only mark the node which is failed and not allow to skip!"); + + } + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index e2b3dff5..d810e62b 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -208,6 +208,12 @@ public class OpenAPIController { return ResultDTO.success(null); } + @PostMapping(OpenAPIConstant.MARK_WORKFLOW_NODE_AS_SUCCESS) + public ResultDTO markWorkflowNodeAsSuccess(Long wfInstanceId,Long nodeId, Long appId) { + workflowInstanceService.markNodeAsSuccess(appId,wfInstanceId,nodeId); + return ResultDTO.success(null); + } + @PostMapping(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO) public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) { return ResultDTO.success(workflowInstanceService.fetchWorkflowInstanceInfo(wfInstanceId, appId)); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java index 1878012e..8144fd6f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java @@ -44,11 +44,18 @@ public class WorkflowInstanceController { } @RequestMapping("/retry") - public ResultDTO retryWfInstance(Long wfInstanceId, Long appId){ + public ResultDTO retryWfInstance(Long wfInstanceId, Long appId) { workflowInstanceService.retryWorkflowInstance(wfInstanceId, appId); return ResultDTO.success(null); } + @RequestMapping("/markNodeAsSuccess") + public ResultDTO markNodeAsSuccess(Long wfInstanceId, Long appId, Long nodeId) { + workflowInstanceService.markNodeAsSuccess(appId, wfInstanceId, nodeId); + return ResultDTO.success(null); + } + + @GetMapping("/info") public ResultDTO getInfo(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstanceDO = workflowInstanceService.fetchWfInstance(wfInstanceId, appId); diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java index 0e6306e0..a0ee278e 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java @@ -111,7 +111,7 @@ public class RepositoryTest { @Test public void testDeleteWorkflowInstanceInfo() { - workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.finishedStatus); + workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.FINISHED_STATUS); } }