fix: the problem of retrying nested workflow node

This commit is contained in:
Echo009 2022-02-10 19:36:37 +08:00
parent d4eb8e3303
commit 5791b43ac6
3 changed files with 43 additions and 16 deletions

View File

@ -26,8 +26,6 @@ import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNotAllowSkipWhenFailed;
/**
* 工作流实例服务
*
@ -135,13 +133,7 @@ public class WorkflowInstanceService {
if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) {
throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!");
}
// 将需要重试的节点状态重置失败且不允许跳过的 或者 手动终止的
for (PEWorkflowDAG.Node node : dag.getNodes()) {
boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node);
if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) {
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null);
}
}
WorkflowDAGUtils.resetRetryableNode(dag);
wfInstance.setDag(JSON.toJSONString(dag));
// 更新工作流实例状态不覆盖实际触发时间
wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());

View File

@ -3,6 +3,7 @@ package tech.powerjob.server.core.workflow.algorithm;
import com.google.common.collect.*;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
@ -22,6 +23,24 @@ public class WorkflowDAGUtils {
}
/**
* 重置可重试节点的状态信息
* @param dag 合法的有向无环图
*/
public static void resetRetryableNode(PEWorkflowDAG dag){
// 将需要重试的节点状态重置失败且不允许跳过的 或者 手动终止的
for (PEWorkflowDAG.Node node : dag.getNodes()) {
boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node);
if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) {
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
// 仅重置任务节点的实例 id 信息
if (node.getNodeType() == null || node.getNodeType() == WorkflowNodeType.JOB.getCode()){
node.setInstanceId(null);
}
}
}
}
/**
* 获取所有根节点
*
@ -173,9 +192,9 @@ public class WorkflowDAGUtils {
private static boolean isReadyNode(long nodeId, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, Long> relyMap) {
PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId);
int currentNodeStatus = currentNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : currentNode.getStatus();
// 跳过已完成节点处理成功 或者 处理失败和已派发节点存在 InstanceId
// 跳过已完成节点处理成功 或者 处理失败和已派发节点 状态为运行中
if (InstanceStatus.FINISHED_STATUS.contains(currentNodeStatus)
|| currentNode.getInstanceId() != null) {
|| currentNodeStatus == InstanceStatus.RUNNING.getV()) {
return false;
}
Collection<Long> relyNodeIds = relyMap.get(nodeId);

View File

@ -1,7 +1,9 @@
package tech.powerjob.server.core.workflow.hanlder.impl;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
@ -10,6 +12,7 @@ import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
@ -50,15 +53,28 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
throw new PowerJobException("invalid nested workflow node," + node.getNodeId());
}
if (node.getInstanceId() != null) {
// 处理重试的情形不需要创建实例仅需要更改对应实例的状态
// 处理重试的情形不需要创建实例仅需要更改对应实例的状态以及相应的节点状态
WorkflowInstanceInfoDO wfInstance = workflowInstanceInfoRepository.findByWfInstanceId(node.getInstanceId()).orElse(null);
if (wfInstance == null) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId());
throw new PowerJobException("invalid workflow instance id" + node.getNodeId());
throw new PowerJobException("invalid nested workflow instance id " + node.getInstanceId());
}
// 不用考虑状态只有失败的工作流嵌套节点状态会被重置
// 需要将子工作流中失败的节点状态重置为 等待 派发
try {
PEWorkflowDAG nodeDag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
if (!WorkflowDAGUtils.valid(nodeDag)) {
throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
}
WorkflowDAGUtils.resetRetryableNode(nodeDag);
wfInstance.setDag(JSON.toJSONString(nodeDag));
wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
} catch (Exception e) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({})'s DAG is illegal!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId(),e);
throw new PowerJobException("illegal nested workflow instance, id : "+ node.getInstanceId());
}
wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
} else {
// 透传当前的上下文创建新的工作流实例
String wfContext = wfInstanceInfo.getWfContext();