diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java index 273e0d02..7b4a5da1 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java @@ -26,8 +26,6 @@ import java.util.Date; import java.util.Objects; import java.util.Optional; -import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNotAllowSkipWhenFailed; - /** * 工作流实例服务 * @@ -135,13 +133,7 @@ public class WorkflowInstanceService { if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) { throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!"); } - // 将需要重试的节点状态重置(失败且不允许跳过的 或者 手动终止的) - for (PEWorkflowDAG.Node node : dag.getNodes()) { - boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node); - if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) { - node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null); - } - } + WorkflowDAGUtils.resetRetryableNode(dag); wfInstance.setDag(JSON.toJSONString(dag)); // 更新工作流实例状态,不覆盖实际触发时间 wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java index 4bbe219f..936c744d 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java @@ -3,6 +3,7 @@ package tech.powerjob.server.core.workflow.algorithm; import com.google.common.collect.*; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.serialize.JsonUtils; @@ -22,6 +23,24 @@ public class WorkflowDAGUtils { } + /** + * 重置可重试节点的状态信息 + * @param dag 合法的有向无环图 + */ + public static void resetRetryableNode(PEWorkflowDAG dag){ + // 将需要重试的节点状态重置(失败且不允许跳过的 或者 手动终止的) + for (PEWorkflowDAG.Node node : dag.getNodes()) { + boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node); + if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) { + node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + // 仅重置任务节点的实例 id 信息 + if (node.getNodeType() == null || node.getNodeType() == WorkflowNodeType.JOB.getCode()){ + node.setInstanceId(null); + } + } + } + } + /** * 获取所有根节点 * @@ -173,9 +192,9 @@ public class WorkflowDAGUtils { private static boolean isReadyNode(long nodeId, Map nodeId2Node, Multimap relyMap) { PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId); int currentNodeStatus = currentNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : currentNode.getStatus(); - // 跳过已完成节点(处理成功 或者 处理失败)和已派发节点(存在 InstanceId) + // 跳过已完成节点(处理成功 或者 处理失败)和已派发节点( 状态为运行中 ) if (InstanceStatus.FINISHED_STATUS.contains(currentNodeStatus) - || currentNode.getInstanceId() != null) { + || currentNodeStatus == InstanceStatus.RUNNING.getV()) { return false; } Collection relyNodeIds = relyMap.get(nodeId); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java index de7d593e..e7d0de99 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java @@ -1,7 +1,9 @@ package tech.powerjob.server.core.workflow.hanlder.impl; +import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; @@ -10,6 +12,7 @@ import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; @@ -50,15 +53,28 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler { throw new PowerJobException("invalid nested workflow node," + node.getNodeId()); } if (node.getInstanceId() != null) { - // 处理重试的情形,不需要创建实例,仅需要更改对应实例的状态 + // 处理重试的情形,不需要创建实例,仅需要更改对应实例的状态,以及相应的节点状态 WorkflowInstanceInfoDO wfInstance = workflowInstanceInfoRepository.findByWfInstanceId(node.getInstanceId()).orElse(null); if (wfInstance == null) { log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId()); - throw new PowerJobException("invalid workflow instance id" + node.getNodeId()); + throw new PowerJobException("invalid nested workflow instance id " + node.getInstanceId()); + } + // 不用考虑状态,只有失败的工作流嵌套节点状态会被重置 + // 需要将子工作流中失败的节点状态重置为 等待 派发 + try { + PEWorkflowDAG nodeDag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + if (!WorkflowDAGUtils.valid(nodeDag)) { + throw new PowerJobException(SystemInstanceResult.INVALID_DAG); + } + WorkflowDAGUtils.resetRetryableNode(nodeDag); + wfInstance.setDag(JSON.toJSONString(nodeDag)); + wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); + wfInstance.setGmtModified(new Date()); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + } catch (Exception e) { + log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({})'s DAG is illegal!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId(),e); + throw new PowerJobException("illegal nested workflow instance, id : "+ node.getInstanceId()); } - wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); - wfInstance.setGmtModified(new Date()); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); } else { // 透传当前的上下文创建新的工作流实例 String wfContext = wfInstanceInfo.getWfContext();