From d1b3aebb417cf77957fef8a11da9f04ed12c7915 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 3 Jun 2020 17:02:05 +0800 Subject: [PATCH] [opt] optimize the workflowInstance, try to remove ReferenceDAG --- .../server/common/utils/WorkflowDAGUtils.java | 10 +++ .../service/instance/InstanceManager.java | 2 +- .../workflow/WorkflowInstanceManager.java | 67 ++++++++++++------- 3 files changed, 54 insertions(+), 25 deletions(-) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java index 16e507d1..e8d32671 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java @@ -107,6 +107,16 @@ public class WorkflowDAGUtils { * @return true/false */ public static boolean valid(PEWorkflowDAG peWorkflowDAG) { + + // 点不允许重复,一个工作流中某个任务只允许出现一次 + Set jobIds = Sets.newHashSet(); + for (PEWorkflowDAG.Node n : peWorkflowDAG.getNodes()) { + if (jobIds.contains(n.getJobId())) { + return false; + } + jobIds.add(n.getJobId()); + } + try { WorkflowDAG workflowDAG = convert(peWorkflowDAG); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index f5915b00..0b43618e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -170,7 +170,7 @@ public class InstanceManager { // workflow 特殊处理 if (wfInstanceId != null) { // 手动停止在工作流中也认为是失败(理论上不应该发生) - getWorkflowInstanceManager().move(wfInstanceId, instanceId, status == InstanceStatus.SUCCEED, result); + getWorkflowInstanceManager().move(wfInstanceId, instanceId, status, result); } // 告警 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index 71e14591..8749ef8c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -26,7 +26,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; /** * 管理运行中的工作流实例 @@ -129,6 +128,8 @@ public class WorkflowInstanceManager { Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); root.setInstanceId(instanceId); root.setStatus(InstanceStatus.RUNNING.getV()); + + log.info("[Workflow-{}|{}] create root instance(jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getJobId(), instanceId); }); // 持久化 @@ -155,10 +156,15 @@ public class WorkflowInstanceManager { * 下一步(当工作流的某个任务完成时调用该方法) * @param wfInstanceId 工作流任务实例ID * @param instanceId 具体完成任务的某个任务实例ID - * @param success 完成任务的任务实例是否成功 + * @param status 完成任务的任务实例状态(SUCCEED/FAILED/STOPPED) * @param result 完成任务的任务实例结果 */ - public void move(Long wfInstanceId, Long instanceId, boolean success, String result) { + public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) { + + // 手动停止的DAG数据已被更新,无需再次处理 + if (status == InstanceStatus.STOPPED) { + return; + } int lockId = wfInstanceId.hashCode(); try { @@ -172,6 +178,8 @@ public class WorkflowInstanceManager { WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); Long wfId = wfInstance.getWorkflowId(); + log.debug("[Workflow-{}|{}] instanceId={},status={},result={}.", wfId, wfInstanceId, instanceId, status, result); + try { WorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class); @@ -182,23 +190,37 @@ public class WorkflowInstanceManager { // 层序遍历 DAG,更新完成节点的状态 Queue queue = Queues.newLinkedBlockingQueue(); queue.addAll(dag.getRoots()); + + boolean allFinished = true; while (!queue.isEmpty()) { WorkflowDAG.Node head = queue.poll(); if (instanceId.equals(head.getInstanceId())) { - head.setStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); + head.setStatus(status.getV()); head.setResult(result); - log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result); + log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, head.getJobId(), status.name(), result); } queue.addAll(head.getSuccessors()); + if (head.getStatus() == InstanceStatus.WAITING_DISPATCH.getV() || head.getStatus() == InstanceStatus.RUNNING.getV()) { + allFinished = false; + } + jobId2Node.put(head.getJobId(), head); head.getSuccessors().forEach(n -> relyMap.put(n.getJobId(), head.getJobId())); } + wfInstance.setGmtModified(new Date()); + wfInstance.setDag(JSONObject.toJSONString(dag)); + // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图 + if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus()); + return; + } + // 任务失败,DAG流程被打断,整体失败 - if (!success) { - wfInstance.setDag(JSONObject.toJSONString(dag)); + if (status == InstanceStatus.FAILED) { wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_FAILED); wfInstance.setFinishedTime(System.currentTimeMillis()); @@ -208,21 +230,26 @@ public class WorkflowInstanceManager { return; } + // 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了) + if (allFinished) { + wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); + // 最终任务的结果作为整个 workflow 的结果 + wfInstance.setResult(result); + wfInstance.setFinishedTime(System.currentTimeMillis()); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + + log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); + return; + } + // 重新计算需要派发的任务 Map jobId2InstanceId = Maps.newHashMap(); Map jobId2InstanceParams = Maps.newHashMap(); - AtomicBoolean allFinished = new AtomicBoolean(true); relyMap.keySet().forEach(jobId -> { - // 无需计算已完成节点(理论上此处不可能出现 FAILED 的情况) - if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV()) { - return; - } - allFinished.set(false); - - // 存在 instanceId,代表任务已派发过,无需再次计算 - if (jobId2Node.get(jobId).getInstanceId() != null) { + // 跳过已完成节点(理论上此处不可能出现 FAILED 的情况)和已派发节点(存在 InstanceId) + if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV() || jobId2Node.get(jobId).getInstanceId() != null) { return; } // 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行 @@ -247,14 +274,6 @@ public class WorkflowInstanceManager { log.debug("[Workflow-{}|{}] workflowInstance start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId); }); - if (allFinished.get()) { - wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); - // 最终任务的结果作为整个 workflow 的结果 - wfInstance.setResult(result); - wfInstance.setFinishedTime(System.currentTimeMillis()); - - log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); - } wfInstance.setDag(JSONObject.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance);