mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: 工作流节点支持标记成功
This commit is contained in:
parent
79a61454f3
commit
823a47303b
@ -1,7 +1,10 @@
|
||||
package com.github.kfcfans.powerjob.client;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.powerjob.common.*;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.OmsConstant;
|
||||
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.request.http.*;
|
||||
import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery;
|
||||
import com.github.kfcfans.powerjob.common.response.*;
|
||||
@ -66,7 +69,7 @@ public class OhMyClient {
|
||||
try {
|
||||
String result = assertApp(appName, password, url);
|
||||
if (StringUtils.isNotEmpty(result)) {
|
||||
ResultDTO<Long> resultDTO = JSONObject.parseObject(result, LONG_RESULT_TYPE);
|
||||
ResultDTO<Long> resultDTO = JSON.parseObject(result, LONG_RESULT_TYPE);
|
||||
if (resultDTO.isSuccess()) {
|
||||
appId = resultDTO.getData();
|
||||
currentAddress = addr;
|
||||
@ -76,6 +79,7 @@ public class OhMyClient {
|
||||
}
|
||||
}
|
||||
} catch (IOException ignore) {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,9 +116,9 @@ public class OhMyClient {
|
||||
|
||||
request.setAppId(appId);
|
||||
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
|
||||
String json = JSONObject.toJSONString(request);
|
||||
String json = JSON.toJSONString(request);
|
||||
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
|
||||
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
|
||||
return JSON.parseObject(post, LONG_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -129,7 +133,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_JOB, body);
|
||||
return JSONObject.parseObject(post, JOB_RESULT_TYPE);
|
||||
return JSON.parseObject(post, JOB_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -142,7 +146,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_ALL_JOB, body);
|
||||
return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE);
|
||||
return JSON.parseObject(post, LIST_JOB_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -156,7 +160,7 @@ public class OhMyClient {
|
||||
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
|
||||
String json = JsonUtils.toJSONStringUnsafe(powerQuery);
|
||||
String post = postHA(OpenAPIConstant.QUERY_JOB, RequestBody.create(jsonType, json));
|
||||
return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE);
|
||||
return JSON.parseObject(post, LIST_JOB_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -171,7 +175,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.DISABLE_JOB, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -186,7 +190,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.ENABLE_JOB, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -201,7 +205,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.DELETE_JOB, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -222,7 +226,7 @@ public class OhMyClient {
|
||||
builder.add("instanceParams", instanceParams);
|
||||
}
|
||||
String post = postHA(OpenAPIConstant.RUN_JOB, builder.build());
|
||||
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
|
||||
return JSON.parseObject(post, LONG_RESULT_TYPE);
|
||||
}
|
||||
|
||||
public ResultDTO<Long> runJob(Long jobId) {
|
||||
@ -243,7 +247,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.STOP_INSTANCE, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -259,7 +263,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -275,7 +279,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -289,7 +293,7 @@ public class OhMyClient {
|
||||
.add("instanceId", instanceId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body);
|
||||
return JSONObject.parseObject(post, INTEGER_RESULT_TYPE);
|
||||
return JSON.parseObject(post, INTEGER_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -303,7 +307,7 @@ public class OhMyClient {
|
||||
.add("instanceId", instanceId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body);
|
||||
return JSONObject.parseObject(post, INSTANCE_RESULT_TYPE);
|
||||
return JSON.parseObject(post, INSTANCE_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/* ************* Workflow API list ************* */
|
||||
@ -321,7 +325,7 @@ public class OhMyClient {
|
||||
// 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg
|
||||
String json = JsonUtils.toJSONStringUnsafe(request);
|
||||
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
|
||||
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
|
||||
return JSON.parseObject(post, LONG_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -335,7 +339,7 @@ public class OhMyClient {
|
||||
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
|
||||
String json = JsonUtils.toJSONStringUnsafe(request);
|
||||
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_DAG, RequestBody.create(jsonType, json));
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -351,7 +355,7 @@ public class OhMyClient {
|
||||
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
|
||||
String json = JsonUtils.toJSONStringUnsafe(requestList);
|
||||
String post = postHA(OpenAPIConstant.ADD_WORKFLOW_NODE, RequestBody.create(jsonType, json));
|
||||
return JSONObject.parseObject(post, WF_NODE_LIST_RESULT_TYPE);
|
||||
return JSON.parseObject(post, WF_NODE_LIST_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -365,7 +369,7 @@ public class OhMyClient {
|
||||
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
|
||||
String json = JsonUtils.toJSONStringUnsafe(request);
|
||||
String post = postHA(OpenAPIConstant.MODIFY_WORKFLOW_NODE, RequestBody.create(jsonType, json));
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
|
||||
@ -381,7 +385,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body);
|
||||
return JSONObject.parseObject(post, WF_RESULT_TYPE);
|
||||
return JSON.parseObject(post, WF_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -396,7 +400,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -411,7 +415,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -426,7 +430,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -446,7 +450,7 @@ public class OhMyClient {
|
||||
builder.add("initParams", initParams);
|
||||
}
|
||||
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
|
||||
return JSONObject.parseObject(post, LONG_RESULT_TYPE);
|
||||
return JSON.parseObject(post, LONG_RESULT_TYPE);
|
||||
}
|
||||
|
||||
public ResultDTO<Long> runWorkflow(Long workflowId) {
|
||||
@ -467,7 +471,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -482,7 +486,24 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.RETRY_WORKFLOW_INSTANCE, body);
|
||||
return JSONObject.parseObject(post, VOID_RESULT_TYPE);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
* mark the workflow node as success
|
||||
*
|
||||
* @param wfInstanceId workflow instanceId
|
||||
* @param nodeId node id
|
||||
* @return Standard return object
|
||||
*/
|
||||
public ResultDTO<Void> markWorkflowNodeAsSuccess(Long wfInstanceId,Long nodeId) {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("wfInstanceId", wfInstanceId.toString())
|
||||
.add("nodeId", nodeId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.MARK_WORKFLOW_NODE_AS_SUCCESS, body);
|
||||
return JSON.parseObject(post, VOID_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -497,7 +518,7 @@ public class OhMyClient {
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body);
|
||||
return JSONObject.parseObject(post, WF_INSTANCE_RESULT_TYPE);
|
||||
return JSON.parseObject(post, WF_INSTANCE_RESULT_TYPE);
|
||||
}
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ class TestWorkflow extends ClientInitializer {
|
||||
private static final long WF_ID = 1;
|
||||
|
||||
@Test
|
||||
public void initTestData() throws Exception {
|
||||
void initTestData() throws Exception {
|
||||
SaveJobInfoRequest base = new SaveJobInfoRequest();
|
||||
base.setJobName("DAG-Node-");
|
||||
base.setTimeExpressionType(TimeExpressionType.WORKFLOW);
|
||||
@ -39,7 +39,7 @@ class TestWorkflow extends ClientInitializer {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveWorkflow() throws Exception {
|
||||
void testSaveWorkflow() throws Exception {
|
||||
|
||||
|
||||
SaveWorkflowRequest req = new SaveWorkflowRequest();
|
||||
@ -55,7 +55,7 @@ class TestWorkflow extends ClientInitializer {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddWorkflowNode(){
|
||||
void testAddWorkflowNode() {
|
||||
AddWorkflowNodeRequest addWorkflowNodeRequest = new AddWorkflowNodeRequest();
|
||||
addWorkflowNodeRequest.setJobId(1L);
|
||||
addWorkflowNodeRequest.setWorkflowId(WF_ID);
|
||||
@ -63,7 +63,7 @@ class TestWorkflow extends ClientInitializer {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModifyWorkflowNode() {
|
||||
void testModifyWorkflowNode() {
|
||||
ModifyWorkflowNodeRequest modifyWorkflowNodeRequest = new ModifyWorkflowNodeRequest();
|
||||
modifyWorkflowNodeRequest.setWorkflowId(WF_ID);
|
||||
modifyWorkflowNodeRequest.setId(1L);
|
||||
@ -74,7 +74,7 @@ class TestWorkflow extends ClientInitializer {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveWorkflowDag() {
|
||||
void testSaveWorkflowDag() {
|
||||
// DAG 图
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
@ -96,50 +96,53 @@ class TestWorkflow extends ClientInitializer {
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testDisableWorkflow() throws Exception {
|
||||
void testDisableWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.disableWorkflow(WF_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWorkflow() throws Exception {
|
||||
void testDeleteWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.deleteWorkflow(WF_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableWorkflow() throws Exception {
|
||||
void testEnableWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.enableWorkflow(WF_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchWorkflowInfo() throws Exception {
|
||||
void testFetchWorkflowInfo() throws Exception {
|
||||
System.out.println(ohMyClient.fetchWorkflow(WF_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWorkflow() throws Exception {
|
||||
void testRunWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.runWorkflow(WF_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopWorkflowInstance() throws Exception {
|
||||
void testStopWorkflowInstance() throws Exception {
|
||||
System.out.println(ohMyClient.stopWorkflowInstance(149962433421639744L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryWorkflowInstance() {
|
||||
void testRetryWorkflowInstance() {
|
||||
System.out.println(ohMyClient.retryWorkflowInstance(149962433421639744L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchWfInstanceInfo() throws Exception {
|
||||
void testMarkWorkflowNodeAsSuccess() {
|
||||
System.out.println(ohMyClient.markWorkflowNodeAsSuccess(149962433421639744L, 1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchWfInstanceInfo() throws Exception {
|
||||
System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWorkflowPlus() throws Exception {
|
||||
void testRunWorkflowPlus() throws Exception {
|
||||
System.out.println(ohMyClient.runWorkflow(WF_ID, "this is init Params 2", 90000));
|
||||
}
|
||||
}
|
||||
|
@ -53,4 +53,5 @@ public class OpenAPIConstant {
|
||||
public static final String STOP_WORKFLOW_INSTANCE = "/stopWfInstance";
|
||||
public static final String RETRY_WORKFLOW_INSTANCE = "/retryWfInstance";
|
||||
public static final String FETCH_WORKFLOW_INSTANCE_INFO = "/fetchWfInstanceInfo";
|
||||
public static final String MARK_WORKFLOW_NODE_AS_SUCCESS = "/markWorkflowNodeAsSuccess";
|
||||
}
|
||||
|
@ -61,4 +61,8 @@ public class SystemInstanceResult {
|
||||
* 被禁用的节点
|
||||
*/
|
||||
public static final String DISABLE_NODE = "disable node";
|
||||
/**
|
||||
* 标记为成功的节点
|
||||
*/
|
||||
public static final String MARK_AS_SUCCESSFUL_NODE = "mark as successful node";
|
||||
}
|
||||
|
@ -15,20 +15,27 @@ import java.util.List;
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum WorkflowInstanceStatus {
|
||||
|
||||
/**
|
||||
* 初始状态为等待调度
|
||||
*/
|
||||
WAITING(1, "等待调度"),
|
||||
RUNNING(2, "运行中"),
|
||||
FAILED(3, "失败"),
|
||||
SUCCEED(4, "成功"),
|
||||
STOPPED(10, "手动停止");
|
||||
|
||||
// 广义的运行状态
|
||||
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING.v, RUNNING.v);
|
||||
// 结束状态
|
||||
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
|
||||
/**
|
||||
* 广义的运行状态
|
||||
*/
|
||||
public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v);
|
||||
/**
|
||||
* 结束状态
|
||||
*/
|
||||
public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
|
||||
|
||||
private int v;
|
||||
private String des;
|
||||
private final int v;
|
||||
|
||||
private final String des;
|
||||
|
||||
public static WorkflowInstanceStatus of(int v) {
|
||||
for (WorkflowInstanceStatus is : values()) {
|
||||
|
@ -168,7 +168,7 @@ public class CleanService {
|
||||
}
|
||||
try {
|
||||
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
|
||||
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.finishedStatus);
|
||||
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.FINISHED_STATUS);
|
||||
log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t);
|
||||
}catch (Exception e) {
|
||||
log.warn("[CleanService] clean workflow instanceInfo failed.", e);
|
||||
|
@ -163,7 +163,7 @@ public class WorkflowInstanceManager {
|
||||
}
|
||||
|
||||
// 并发度控制
|
||||
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.generalizedRunningStatus);
|
||||
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
|
||||
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
|
||||
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
|
||||
return;
|
||||
@ -237,7 +237,7 @@ public class WorkflowInstanceManager {
|
||||
Long wfId = wfInstance.getWorkflowId();
|
||||
|
||||
// 特殊处理手动终止 且 工作流实例已经不在运行状态的情况
|
||||
if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||
// 由用户手动停止工作流实例导致,不需要任何操作
|
||||
return;
|
||||
}
|
||||
@ -267,7 +267,7 @@ public class WorkflowInstanceManager {
|
||||
wfInstance.setGmtModified(new Date());
|
||||
wfInstance.setDag(JSON.toJSONString(dag));
|
||||
// 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图
|
||||
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus());
|
||||
return;
|
||||
|
@ -54,7 +54,7 @@ public class WorkflowInstanceService {
|
||||
*/
|
||||
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||
throw new PowerJobException("workflow instance already stopped");
|
||||
}
|
||||
// 停止所有已启动且未完成的服务
|
||||
@ -91,7 +91,7 @@ public class WorkflowInstanceService {
|
||||
public void retryWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
// 仅允许重试 失败的工作流
|
||||
if (WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||
throw new PowerJobException("workflow instance is running");
|
||||
}
|
||||
if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) {
|
||||
@ -145,4 +145,53 @@ public class WorkflowInstanceService {
|
||||
return wfInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add by Echo009 on 2021/02/20
|
||||
* 将节点标记成功
|
||||
* 注意:这里仅能标记真正执行失败的且不允许跳过的节点
|
||||
* 即处于 [失败且不允许跳过] 的节点
|
||||
* 而且仅会操作工作流实例 DAG 中的节点信息(状态、result)
|
||||
* 并不会改变对应任务实例中的任何信息
|
||||
*
|
||||
* @param wfInstanceId 工作流实例 ID
|
||||
* @param nodeId 节点 ID
|
||||
*/
|
||||
public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) {
|
||||
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
// 校验工作流实例状态,运行中的不允许处理,
|
||||
if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||
throw new PowerJobException("you can't mark the node in a running workflow!");
|
||||
}
|
||||
// 这里一定能反序列化成功
|
||||
PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||
PEWorkflowDAG.Node targetNode = null;
|
||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||
if (node.getNodeId().equals(nodeId)) {
|
||||
targetNode = node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (targetNode == null) {
|
||||
throw new PowerJobException("can't find the node in current DAG!");
|
||||
}
|
||||
boolean allowSkipWhenFailed = targetNode.getSkipWhenFailed() != null && targetNode.getSkipWhenFailed();
|
||||
// 仅允许处理 执行失败的且不允许失败跳过的节点
|
||||
if (targetNode.getInstanceId() != null
|
||||
&& targetNode.getStatus() == InstanceStatus.FAILED.getV()
|
||||
// 不允许失败跳过
|
||||
&& !allowSkipWhenFailed) {
|
||||
// 仅处理工作流实例中的节点信息
|
||||
targetNode.setStatus(InstanceStatus.SUCCEED.getV())
|
||||
.setResult(SystemInstanceResult.MARK_AS_SUCCESSFUL_NODE);
|
||||
|
||||
wfInstance.setDag(JSON.toJSONString(dag));
|
||||
wfInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
return;
|
||||
}
|
||||
// 其他情况均拒绝处理
|
||||
throw new PowerJobException("you can only mark the node which is failed and not allow to skip!");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -208,6 +208,12 @@ public class OpenAPIController {
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.MARK_WORKFLOW_NODE_AS_SUCCESS)
|
||||
public ResultDTO<Void> markWorkflowNodeAsSuccess(Long wfInstanceId,Long nodeId, Long appId) {
|
||||
workflowInstanceService.markNodeAsSuccess(appId,wfInstanceId,nodeId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO)
|
||||
public ResultDTO<WorkflowInstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) {
|
||||
return ResultDTO.success(workflowInstanceService.fetchWorkflowInstanceInfo(wfInstanceId, appId));
|
||||
|
@ -44,11 +44,18 @@ public class WorkflowInstanceController {
|
||||
}
|
||||
|
||||
@RequestMapping("/retry")
|
||||
public ResultDTO<Void> retryWfInstance(Long wfInstanceId, Long appId){
|
||||
public ResultDTO<Void> retryWfInstance(Long wfInstanceId, Long appId) {
|
||||
workflowInstanceService.retryWorkflowInstance(wfInstanceId, appId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@RequestMapping("/markNodeAsSuccess")
|
||||
public ResultDTO<Void> markNodeAsSuccess(Long wfInstanceId, Long appId, Long nodeId) {
|
||||
workflowInstanceService.markNodeAsSuccess(appId, wfInstanceId, nodeId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("/info")
|
||||
public ResultDTO<WorkflowInstanceInfoVO> getInfo(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstanceDO = workflowInstanceService.fetchWfInstance(wfInstanceId, appId);
|
||||
|
@ -111,7 +111,7 @@ public class RepositoryTest {
|
||||
|
||||
@Test
|
||||
public void testDeleteWorkflowInstanceInfo() {
|
||||
workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.finishedStatus);
|
||||
workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.FINISHED_STATUS);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user