From c3cc8aef4c06852453503181c23c6d4457f24be6 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Thu, 18 Feb 2021 15:20:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B7=A5=E4=BD=9C=E6=B5=81=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=8A=82=E7=82=B9=E7=A6=81=E7=94=A8=EF=BC=8C=E5=8E=9F?= =?UTF-8?q?=E5=9C=B0=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../powerjob/common/SystemInstanceResult.java | 5 +- .../powerjob/common/model/PEWorkflowDAG.java | 2 + .../server/common/utils/WorkflowDAGUtils.java | 129 ++++++++++- .../workflow/WorkflowInstanceManager.java | 128 +++++------ .../workflow/WorkflowInstanceService.java | 63 +++++- .../WorkflowInstanceController.java | 6 + .../kfcfans/powerjob/server/test/DAGTest.java | 213 +++++++++++++++++- 7 files changed, 476 insertions(+), 70 deletions(-) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java index 79346426..7300cea0 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java @@ -57,5 +57,8 @@ public class SystemInstanceResult { */ public static final String INVALID_DAG = "invalid dag"; - + /** + * 被禁用的节点 + */ + public static final String DISABLE_NODE = "disable node"; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java index 5a730c07..8aceef46 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java @@ -6,6 +6,7 @@ import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -35,6 +36,7 @@ public class PEWorkflowDAG implements Serializable { * Point. */ @Data + @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor public static class Node implements Serializable { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java index 6ac7d823..ab20feb4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java @@ -2,13 +2,13 @@ package com.github.kfcfans.powerjob.server.common.utils; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.model.WorkflowDAG; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import com.google.common.collect.*; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -24,6 +24,7 @@ public class WorkflowDAGUtils { private WorkflowDAGUtils() { } + /** * 获取所有根节点 * @@ -72,6 +73,128 @@ public class WorkflowDAGUtils { return false; } + + /** + * Add by Echo009 on 2021/02/08 + * 获取准备好的节点(非完成状态的节点且,前置依赖节点为空或者均处于完成状态) + * 注意,这里会直接将当前 disable(enable = false)的节点的状态置为完成 + * + * @param dag 点线表示法的DAG图 + * @return 当前可执行的节点 + */ + public static List listReadyNodes(PEWorkflowDAG dag) { + // 保存 nodeId -> Node 的映射关系 + Map nodeId2Node = Maps.newHashMap(); + + List dagNodes = dag.getNodes(); + for (PEWorkflowDAG.Node node : dagNodes) { + nodeId2Node.put(node.getNodeId(), node); + } + // 构建依赖树(下游任务需要哪些上游任务完成才能执行) + Multimap relyMap = LinkedListMultimap.create(); + // 后继节点 Map + Multimap successorMap = LinkedListMultimap.create(); + dag.getEdges().forEach(edge -> { + relyMap.put(edge.getTo(), edge.getFrom()); + successorMap.put(edge.getFrom(), edge.getTo()); + }); + List readyNodes = Lists.newArrayList(); + List skipNodes = Lists.newArrayList(); + + for (PEWorkflowDAG.Node currentNode : dagNodes) { + if (!isReadyNode(currentNode.getNodeId(), nodeId2Node, relyMap)) { + continue; + } + // 需要直接跳过的节点 + if (currentNode.getEnable() != null && !currentNode.getEnable()) { + skipNodes.add(currentNode); + } else { + readyNodes.add(currentNode); + } + } + // 当前直接跳过的节点不为空 + if (!skipNodes.isEmpty()) { + for (PEWorkflowDAG.Node skipNode : skipNodes) { + // move + readyNodes.addAll(moveAndObtainReadySuccessor(skipNode, nodeId2Node, relyMap, successorMap)); + } + } + return readyNodes; + } + + /** + * 移动并获取就绪的后继节点 + * + * @param skippedNode 当前需要跳过的节点 + * @param nodeId2Node nodeId -> Node + * @param relyMap to-node id -> list of from-node id + * @param successorMap from-node id -> list of to-node id + * @return 就绪的后继节点 + */ + private static List moveAndObtainReadySuccessor(PEWorkflowDAG.Node skippedNode, Map nodeId2Node, Multimap relyMap, Multimap successorMap) { + + // 更新当前跳过节点的状态 + skippedNode.setStatus(InstanceStatus.SUCCEED.getV()); + skippedNode.setResult(SystemInstanceResult.DISABLE_NODE); + // 有可能出现需要连续移动的情况 + List readyNodes = Lists.newArrayList(); + List skipNodes = Lists.newArrayList(); + // 获取当前跳过节点的后继节点 + Collection successors = successorMap.get(skippedNode.getNodeId()); + for (Long successor : successors) { + // 判断后继节点是否处于 Ready 状态(前驱节点均处于完成状态) + if (isReadyNode(successor, nodeId2Node, relyMap)) { + PEWorkflowDAG.Node node = nodeId2Node.get(successor); + if (node.getEnable() != null && !node.getEnable()) { + // 需要跳过 + skipNodes.add(node); + continue; + } + readyNodes.add(node); + } + } + // 深度优先,继续移动 + if (!skipNodes.isEmpty()) { + for (PEWorkflowDAG.Node node : skipNodes) { + readyNodes.addAll(moveAndObtainReadySuccessor(node, nodeId2Node, relyMap, successorMap)); + } + } + return readyNodes; + } + + /** + * 判断当前节点是否准备就绪 + * + * @param nodeId Node id + * @param nodeId2Node Node id -> Node + * @param relyMap to-node id -> list of from-node id + * @return true if current node is ready + */ + private static boolean isReadyNode(long nodeId, Map nodeId2Node, Multimap relyMap) { + PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId); + int currentNodeStatus = currentNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : currentNode.getStatus(); + // 跳过已完成节点(处理成功 或者 处理失败)和已派发节点(存在 InstanceId) + if (InstanceStatus.FINISHED_STATUS.contains(currentNodeStatus) + || currentNode.getInstanceId() != null) { + return false; + } + Collection relyNodeIds = relyMap.get(nodeId); + for (Long relyNodeId : relyNodeIds) { + PEWorkflowDAG.Node relyNode = nodeId2Node.get(relyNodeId); + int relyNodeStatus = relyNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : relyNode.getStatus(); + // 只要依赖的节点有一个未完成,那么就不是就绪状态 + // 注意,这里允许失败的原因是有允许失败跳过节点的存在,对于不允许跳过的失败节点,一定走不到这里(工作流会被打断) + if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(relyNodeStatus)) { + return false; + } + } + return true; + } + + public static boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) { + // 默认不允许跳过 + return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed(); + } /** * 将点线表示法的DAG图转化为引用表达法的DAG图 * diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index fe658509..664167dd 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -29,6 +29,8 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.*; +import static com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils.isNotAllowSkipWhenFailed; + /** * 管理运行中的工作流实例 * @@ -94,10 +96,26 @@ public class WorkflowInstanceManager { newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); + // 校验 DAG 信息 + PEWorkflowDAG dag = null; + try { + dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); + // 校验 + if (!WorkflowDAGUtils.valid(dag)) { + throw new PowerJobException(SystemInstanceResult.INVALID_DAG); + } + } catch (Exception e) { + log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId); + onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance); + return newWfInstance.getWfInstanceId(); + } // 校验合法性(工作是否存在且启用) Set allJobIds = Sets.newHashSet(); - PEWorkflowDAG dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); - dag.getNodes().forEach(node -> allJobIds.add(node.getJobId())); + dag.getNodes().forEach(node -> { + allJobIds.add(node.getJobId()); + // 将节点的初始状态置为等待派发 + node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + }); int needNum = allJobIds.size(); long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds); log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); @@ -107,16 +125,9 @@ public class WorkflowInstanceManager { log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance); } else { - // 再次校验 DAG - boolean valid = WorkflowDAGUtils.valid(dag); - if (!valid) { - log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId); - onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance); - } else { - initNodeInfo(dag); - // 再 set 一次,此时工作流中的节点信息已经完全初始化 - newWfInstance.setDag(JSON.toJSONString(dag)); - } + initNodeInfo(dag); + // 再 set 一次,此时工作流中的节点信息已经完全初始化 + newWfInstance.setDag(JSON.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(newWfInstance); } return wfInstanceId; @@ -129,6 +140,7 @@ public class WorkflowInstanceManager { * 1、工作流支持配置重复的任务节点 * 2、移除参数 initParams,改为统一从工作流实例中获取 * 传递工作流实例的 wfContext 作为 初始启动参数 + * 3、通过 {@link WorkflowDAGUtils#listReadyNodes} 兼容原地重试逻辑 * ******************************************** * * @param wfInfo 工作流任务信息 @@ -158,31 +170,36 @@ public class WorkflowInstanceManager { try { // 从实例中读取工作流信息 - PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class); - List roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); - - peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); + PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class); + // 根节点有可能被 disable + List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); // 创建所有的根任务 - roots.forEach(root -> { + readyNodes.forEach(readyNode -> { // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示 // instanceParam 传递的是工作流实例的 wfContext - Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), root.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); - root.setInstanceId(instanceId); - root.setStatus(InstanceStatus.RUNNING.getV()); + Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); + readyNode.setInstanceId(instanceId); + readyNode.setStatus(InstanceStatus.RUNNING.getV()); - log.info("[Workflow-{}|{}] create root instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId(), instanceId); + log.info("[Workflow-{}|{}] create readyNode instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), instanceId); }); // 持久化 wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); - wfInstanceInfo.setDag(JSON.toJSONString(peWorkflowDAG)); + wfInstanceInfo.setDag(JSON.toJSONString(dag)); + if (readyNodes.isEmpty()) { + // 没有就绪的节点(所有节点都被禁用) + wfInstanceInfo.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); + log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo); + workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); + return; + } workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - roots.forEach(this::runInstance); + readyNodes.forEach(this::runInstance); } catch (Exception e) { - log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo); } @@ -195,6 +212,7 @@ public class WorkflowInstanceManager { * 1、工作流支持配置重复的任务节点 * 2、不再获取上游任务的结果作为实例参数而是传递工作流 * 实例的 wfContext 作为 实例参数 + * 3、通过 {@link WorkflowDAGUtils#listReadyNodes} 支持跳过禁用的节点 * ******************************************** * * @param wfInstanceId 工作流任务实例ID @@ -225,9 +243,6 @@ public class WorkflowInstanceManager { try { PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); - // 保存 nodeId -> Node 的映射关系 - Map nodeId2Node = Maps.newHashMap(); - // 更新完成节点状态 boolean allFinished = true; PEWorkflowDAG.Node instanceNode = null; @@ -241,7 +256,6 @@ public class WorkflowInstanceManager { if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { allFinished = false; } - nodeId2Node.put(node.getNodeId(), node); } if (instanceNode == null) { // DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略 @@ -259,7 +273,7 @@ public class WorkflowInstanceManager { } // 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败 - if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode) ) { + if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) { log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); return; @@ -275,7 +289,12 @@ public class WorkflowInstanceManager { log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId); return; } - + // 注意:这里会直接跳过 disable 的节点 + List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); + // 如果没有就绪的节点,需要再次判断是否已经全部完成 + if (readyNodes.isEmpty() && isFinish(dag)) { + allFinished = true; + } // 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了) if (allFinished) { wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); @@ -288,36 +307,14 @@ public class WorkflowInstanceManager { return; } - // 构建依赖树(下游任务需要哪些上游任务完成才能执行) - Multimap relyMap = LinkedListMultimap.create(); - dag.getEdges().forEach(edge -> relyMap.put(edge.getTo(), edge.getFrom())); - - // 重新计算需要派发的任务 - List readyNodes = Lists.newArrayList(); - relyMap.keySet().forEach(nodeId -> { - PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId); - // 跳过已完成节点(处理成功 或者 处理失败)和已派发节点(存在 InstanceId) - if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV() - || currentNode.getStatus() == InstanceStatus.FAILED.getV() - || currentNode.getInstanceId() != null) { - return; - } - // 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行 - for (Long reliedNodeId : relyMap.get(nodeId)) { - // 注意,这里允许失败的原因是有允许失败跳过节点的存在,对于不允许跳过的失败节点,一定走不到这里(工作流会被打断) - if (nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.SUCCEED.getV() - && nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.FAILED.getV()) { - return; - } - } + for (PEWorkflowDAG.Node readyNode : readyNodes) { // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 // instanceParam 传递的是工作流实例的 wfContext - Long newInstanceId = instanceService.create(currentNode.getJobId(), wfInstance.getAppId(), currentNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); - currentNode.setInstanceId(newInstanceId); - currentNode.setStatus(InstanceStatus.RUNNING.getV()); - readyNodes.add(currentNode); - log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId(), newInstanceId); - }); + Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); + readyNode.setInstanceId(newInstanceId); + readyNode.setStatus(InstanceStatus.RUNNING.getV()); + log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId); + } wfInstance.setDag(JSON.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); @@ -372,7 +369,6 @@ public class WorkflowInstanceManager { /** * 初始化节点信息 * - * * @param dag pe dag * @since 20210205 */ @@ -418,6 +414,16 @@ public class WorkflowInstanceManager { dispatchService.dispatch(jobInfo, node.getInstanceId()); } + + private boolean isFinish(PEWorkflowDAG dag){ + for (PEWorkflowDAG.Node node : dag.getNodes()) { + if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { + return false; + } + } + return true; + } + private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) { wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); @@ -444,8 +450,4 @@ public class WorkflowInstanceManager { } } - private boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) { - // 默认不允许跳过 - return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed(); - } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index 7ca5763a..9cbe2ae8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -7,8 +7,11 @@ import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.response.WorkflowInstanceInfoDTO; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import lombok.extern.slf4j.Slf4j; @@ -18,6 +21,9 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.Objects; +import java.util.Optional; + +import static com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils.isNotAllowSkipWhenFailed; /** * 工作流实例服务 @@ -33,12 +39,17 @@ public class WorkflowInstanceService { private InstanceService instanceService; @Resource private WorkflowInstanceInfoRepository wfInstanceInfoRepository; + @Resource + private WorkflowInstanceManager workflowInstanceManager; + @Resource + private WorkflowInfoRepository workflowInfoRepository; /** * 停止工作流实例 + * * @param wfInstanceId 工作流实例ID - * @param appId 所属应用ID + * @param appId 所属应用ID */ public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); @@ -56,7 +67,7 @@ public class WorkflowInstanceService { instanceService.stopInstance(node.getInstanceId()); } - }catch (Exception e) { + } catch (Exception e) { log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e); } }); @@ -70,6 +81,54 @@ public class WorkflowInstanceService { log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId); } + /** + * Add by Echo009 on 2021/02/07 + * + * @param wfInstanceId 工作流实例ID + * @param appId 应用ID + */ + public void retryWorkflowInstance(Long wfInstanceId, Long appId) { + WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); + // 仅允许重试 失败的工作流 + if (WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + throw new PowerJobException("workflow instance is running"); + } + if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) { + throw new PowerJobException("workflow instance is already successful"); + } + // 因为 DAG 非法而终止的工作流实例无法重试 + PEWorkflowDAG dag = null; + try { + dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + if (!WorkflowDAGUtils.valid(dag)) { + throw new PowerJobException(SystemInstanceResult.INVALID_DAG); + } + + } catch (Exception e) { + throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!"); + } + // 检查当前工作流信息 + Optional workflowInfo = workflowInfoRepository.findById(wfInstance.getWorkflowId()); + if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) { + throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!"); + } + // 将需要重试的节点状态重置(失败且不允许跳过的) + for (PEWorkflowDAG.Node node : dag.getNodes()) { + if (node.getStatus() == InstanceStatus.FAILED.getV() + && isNotAllowSkipWhenFailed(node)) { + node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null); + } + } + wfInstance.setDag(JSON.toJSONString(dag)); + // 更新工作流实例状态,不覆盖实际触发时间 + wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); + wfInstance.setGmtModified(new Date()); + wfInstanceInfoRepository.saveAndFlush(wfInstance); + // 立即开始 + workflowInstanceManager.start(workflowInfo.get(), wfInstanceId); + } + + public WorkflowInstanceInfoDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); WorkflowInstanceInfoDTO dto = new WorkflowInstanceInfoDTO(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java index 67c1101f..1878012e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java @@ -43,6 +43,12 @@ public class WorkflowInstanceController { return ResultDTO.success(null); } + @RequestMapping("/retry") + public ResultDTO retryWfInstance(Long wfInstanceId, Long appId){ + workflowInstanceService.retryWorkflowInstance(wfInstanceId, appId); + return ResultDTO.success(null); + } + @GetMapping("/info") public ResultDTO getInfo(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstanceDO = workflowInstanceService.fetchWfInstance(wfInstanceId, appId); diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DAGTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DAGTest.java index 08a0dbc5..678859f0 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DAGTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DAGTest.java @@ -1,14 +1,17 @@ package com.github.kfcfans.powerjob.server.test; import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; -import com.github.kfcfans.powerjob.server.model.WorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.powerjob.server.model.WorkflowDAG; import com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.stream.Collectors; /** * DAG 图算法测试集合 @@ -80,5 +83,213 @@ public class DAGTest { } + /** + * @author Echo009 + * @since 2021/02/07 + */ + @Test + public void testListReadyNodes1() { + // 双顶点 + // 1 -> 3 + // 2(x) -> 4 -> 5 + // 6(x) -> 7(x) -> 8(x) -> 4 + // 8(x) -> 9 + + List nodes1 = Lists.newLinkedList(); + List edges1 = Lists.newLinkedList(); + + nodes1.add(new PEWorkflowDAG.Node(1L, 1L, "1")); + nodes1.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(3L, 3L, "3")); + nodes1.add(new PEWorkflowDAG.Node(4L, 4L, "4")); + nodes1.add(new PEWorkflowDAG.Node(5L, 5L, "5")); + nodes1.add(new PEWorkflowDAG.Node(6L, 6L, "6").setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(7L, 7L, "7").setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(8L, 8L, "8").setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(9L, 9L, "9")); + edges1.add(new PEWorkflowDAG.Edge(1L, 3L)); + edges1.add(new PEWorkflowDAG.Edge(2L, 4L)); + edges1.add(new PEWorkflowDAG.Edge(4L, 5L)); + edges1.add(new PEWorkflowDAG.Edge(4L, 5L)); + edges1.add(new PEWorkflowDAG.Edge(6L, 7L)); + edges1.add(new PEWorkflowDAG.Edge(7L, 8L)); + edges1.add(new PEWorkflowDAG.Edge(8L, 4L)); + edges1.add(new PEWorkflowDAG.Edge(8L, 9L)); + + PEWorkflowDAG dag1 = new PEWorkflowDAG(nodes1, edges1); + List readyNodeIds1 = WorkflowDAGUtils.listReadyNodes(dag1).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList()); + + System.out.println(readyNodeIds1); + Assert.assertTrue(readyNodeIds1.contains(1L)); + Assert.assertTrue(readyNodeIds1.contains(4L)); + Assert.assertTrue(readyNodeIds1.contains(9L)); + + } + + /** + * @author Echo009 + * @since 2021/02/07 + */ + @Test + public void testListReadyNodes2() { + + // 注:(x) 代表 enable = false 的节点 + // 测试连续 move + // 1(x) -> 2(x) -> 3 -> 4 -> 5(x) -> 6 + + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + + nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3")); + nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); + nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + edges.add(new PEWorkflowDAG.Edge(1L, 2L)); + edges.add(new PEWorkflowDAG.Edge(2L, 3L)); + edges.add(new PEWorkflowDAG.Edge(3L, 4L)); + edges.add(new PEWorkflowDAG.Edge(4L, 5L)); + edges.add(new PEWorkflowDAG.Edge(5L, 6L)); + + PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges); + List readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList()); + + System.out.println(readyNodeIds2); + + Assert.assertEquals(1, readyNodeIds2.size()); + Assert.assertTrue(readyNodeIds2.contains(3L)); + + } + + + /** + * @author Echo009 + * @since 2021/02/07 + */ + @Test + public void testListReadyNodes3() { + + // 注:(x) 代表 enable = false 的节点 + // 复杂 move + // 1(failed) -> 2(x) -> 4 -> 5(x) -> 6 + // 3(success) -> 4 + // 7 -> 6 + + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + + nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV())); + nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setStatus(InstanceStatus.SUCCEED.getV())); + nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); + nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7")); + edges.add(new PEWorkflowDAG.Edge(1L, 2L)); + edges.add(new PEWorkflowDAG.Edge(2L, 4L)); + edges.add(new PEWorkflowDAG.Edge(3L, 4L)); + edges.add(new PEWorkflowDAG.Edge(4L, 5L)); + edges.add(new PEWorkflowDAG.Edge(5L, 6L)); + edges.add(new PEWorkflowDAG.Edge(7L, 6L)); + + PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges); + List readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList()); + + System.out.println(readyNodeIds2); + + Assert.assertEquals(2, readyNodeIds2.size()); + Assert.assertTrue(readyNodeIds2.contains(4L)); + Assert.assertTrue(readyNodeIds2.contains(7L)); + + } + + + /** + * @author Echo009 + * @since 2021/02/07 + */ + @Test + public void testListReadyNodes4() { + + // 注:(x) 代表 enable = false 的节点 + // 复杂 move + // 1(failed) -> 2(x) -> 5 -> 6 + // 3(x) -> 5 + // 1(failed) -> 3(x) -> 4(x) -> 5 + // 4(x) -> 6 + + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + + nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV())); + nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5")); + nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + edges.add(new PEWorkflowDAG.Edge(1L, 2L)); + edges.add(new PEWorkflowDAG.Edge(2L, 5L)); + edges.add(new PEWorkflowDAG.Edge(5L, 6L)); + edges.add(new PEWorkflowDAG.Edge(1L, 3L)); + edges.add(new PEWorkflowDAG.Edge(3L, 4L)); + edges.add(new PEWorkflowDAG.Edge(3L, 5L)); + edges.add(new PEWorkflowDAG.Edge(4L, 5L)); + + PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges); + List readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList()); + + System.out.println(readyNodeIds2); + + Assert.assertEquals(1, readyNodeIds2.size()); + Assert.assertTrue(readyNodeIds2.contains(5L)); + + } + + + /** + * @author Echo009 + * @since 2021/02/07 + */ + @Test + public void testListReadyNodes5() { + + // 注:(x) 代表 enable = false 的节点 + // 复杂 move + // 1(failed) -> 2(x) -> 5 -> 6 + // 3(x) -> 5 + // 1(failed) -> 3(x) -> 4(x) -> 5 + // 4(x) -> 6 + // 4(x) -> 7 + + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + + nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV())); + nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5")); + nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7")); + edges.add(new PEWorkflowDAG.Edge(1L, 2L)); + edges.add(new PEWorkflowDAG.Edge(2L, 5L)); + edges.add(new PEWorkflowDAG.Edge(5L, 6L)); + edges.add(new PEWorkflowDAG.Edge(1L, 3L)); + edges.add(new PEWorkflowDAG.Edge(3L, 4L)); + edges.add(new PEWorkflowDAG.Edge(3L, 5L)); + edges.add(new PEWorkflowDAG.Edge(4L, 5L)); + edges.add(new PEWorkflowDAG.Edge(4L, 7L)); + + PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges); + List readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList()); + + System.out.println(readyNodeIds2); + + Assert.assertEquals(2, readyNodeIds2.size()); + Assert.assertTrue(readyNodeIds2.contains(5L)); + Assert.assertTrue(readyNodeIds2.contains(7L)); + + } }