From 55dbc86d29ba67156e3fdf36b4f21e33abeb7d48 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 2 Jun 2020 20:14:49 +0800 Subject: [PATCH] [dev] change DAG description method --- README.md | 6 ++- .../oms/common/model/PEWorkflowDAG.java | 7 +++- .../server/common/utils/WorkflowDAGUtils.java | 19 +++++----- .../kfcfans/oms/server/model/WorkflowDAG.java | 4 +- .../workflow/WorkflowInstanceManager.java | 11 ++++-- .../workflow/WorkflowInstanceService.java | 19 ++++++---- .../kfcfans/oms/server/test/DAGTest.java | 20 +++++----- .../workflow/WorkflowStandaloneProcessor.java | 35 +++++++++++++++++ .../core/executor/ProcessorRunnable.java | 2 +- .../worker/core/processor/TaskContext.java | 38 +++++++++++-------- 10 files changed, 110 insertions(+), 51 deletions(-) create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java diff --git a/README.md b/README.md index 07a7d582..7728c576 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ ![logo](./others/images/oms-logo.png) -![Java CI with Maven](https://github.com/KFCFans/OhMyScheduler/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master) [![GitHub license](https://img.shields.io/github/license/KFCFans/OhMyScheduler)](https://github.com/KFCFans/OhMyScheduler/blob/master/LICENSE) + +

+ + +

OhMyScheduler is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily. diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java index 4d9cb4cc..4db453e8 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java @@ -34,8 +34,13 @@ public class PEWorkflowDAG { // 仅向前端输出时需要 private Long instanceId; - private boolean finished; + private int status; private String result; + + public Node(Long jobId, String jobName) { + this.jobId = jobId; + this.jobName = jobName; + } } // 边 jobId -> jobId diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java index b7503bde..16e507d1 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.common.utils; +import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; import com.github.kfcfans.oms.common.utils.JsonUtils; @@ -38,7 +39,7 @@ public class WorkflowDAGUtils { // 创建节点 PEWorkflowDAG.getNodes().forEach(node -> { Long jobId = node.getJobId(); - WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, false, null); + WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, InstanceStatus.WAITING_DISPATCH.getV(), null); id2Node.put(jobId, n); // 初始阶段,每一个点都设为顶点 @@ -88,7 +89,7 @@ public class WorkflowDAGUtils { queue.addAll(node.getSuccessors()); // 添加点 - PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.isFinished(), node.getResult()); + PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.getStatus(), node.getResult()); nodes.add(peNode); // 添加线 @@ -111,7 +112,7 @@ public class WorkflowDAGUtils { // 检查所有顶点的路径 for (WorkflowDAG.Node root : workflowDAG.getRoots()) { - if (!check(root, Sets.newHashSet())) { + if (invalidPath(root, Sets.newHashSet())) { return false; } } @@ -121,21 +122,21 @@ public class WorkflowDAGUtils { return false; } - private static boolean check(WorkflowDAG.Node root, Set ids) { + private static boolean invalidPath(WorkflowDAG.Node root, Set ids) { // 递归出口(出现之前的节点则代表有环,失败;出现无后继者节点,则说明该路径成功) if (ids.contains(root.getJobId())) { - return false; + return true; } if (root.getSuccessors().isEmpty()) { - return true; + return false; } ids.add(root.getJobId()); for (WorkflowDAG.Node node : root.getSuccessors()) { - if (!check(node, Sets.newHashSet(ids))) { - return false; + if (invalidPath(node, Sets.newHashSet(ids))) { + return true; } } - return true; + return false; } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java index e9a8d94f..15419809 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java @@ -31,8 +31,8 @@ public class WorkflowDAG { private String jobName; // 运行时参数 - private Long instanceId; - private boolean finished; + private Long instanceId; // 任务实例ID + private int status; // 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED private String result; } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index 8aa2084d..db9d5331 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.service.workflow; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.WorkflowInstanceStatus; @@ -126,6 +127,7 @@ public class WorkflowInstanceManager { roots.forEach(root -> { Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); root.setInstanceId(instanceId); + root.setStatus(InstanceStatus.RUNNING.getV()); }); // 持久化 @@ -178,7 +180,7 @@ public class WorkflowInstanceManager { while (!queue.isEmpty()) { WorkflowDAG.Node head = queue.poll(); if (instanceId.equals(head.getInstanceId())) { - head.setFinished(true); + head.setStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); head.setResult(result); log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result); @@ -208,8 +210,8 @@ public class WorkflowInstanceManager { AtomicBoolean allFinished = new AtomicBoolean(true); relyMap.keySet().forEach(jobId -> { - // 无需计算已完成节点 - if (jobId2Node.get(jobId).isFinished()) { + // 无需计算已完成节点(理论上此处不可能出现 FAILED 的情况) + if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV()) { return; } allFinished.set(false); @@ -220,7 +222,7 @@ public class WorkflowInstanceManager { } // 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行 for (Long reliedJobId : relyMap.get(jobId)) { - if (!jobId2Node.get(reliedJobId).isFinished()) { + if (jobId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) { return; } } @@ -232,6 +234,7 @@ public class WorkflowInstanceManager { Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JSONObject.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis()); jobId2Node.get(jobId).setInstanceId(newInstanceId); + jobId2Node.get(jobId).setStatus(InstanceStatus.RUNNING.getV()); jobId2InstanceId.put(jobId, newInstanceId); jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result)); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java index 91690976..783240ce 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.service.workflow; import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.WorkflowInstanceStatus; @@ -45,13 +46,6 @@ public class WorkflowInstanceService { if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { throw new OmsException("workflow instance already stopped"); } - - // 修改数据库状态 - wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); - wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER); - wfInstance.setGmtModified(new Date()); - wfInstanceInfoRepository.saveAndFlush(wfInstance); - // 停止所有已启动且未完成的服务 WorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class); Queue queue = Queues.newLinkedBlockingQueue(); @@ -59,13 +53,22 @@ public class WorkflowInstanceService { while (!queue.isEmpty()) { WorkflowDAG.Node node = queue.poll(); - if (node.getInstanceId() != null && !node.isFinished()) { + if (node.getInstanceId() != null && node.getStatus() == InstanceStatus.RUNNING.getV()) { log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); + node.setStatus(InstanceStatus.STOPPED.getV()); + node.setResult(SystemInstanceResult.STOPPED_BY_USER); + instanceService.stopInstance(node.getInstanceId()); } queue.addAll(node.getSuccessors()); } + // 修改数据库状态 + wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); + wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER); + wfInstance.setGmtModified(new Date()); + wfInstanceInfoRepository.saveAndFlush(wfInstance); + log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId); } diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java index a1d45db7..db780af9 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java @@ -25,8 +25,8 @@ public class DAGTest { List edges = Lists.newLinkedList(); // 图1: 1 -> 2 -> 1,理论上报错 - nodes.add(new PEWorkflowDAG.Node(1L, "1", null, false, null)); - nodes.add(new PEWorkflowDAG.Node(2L, "2", null, false, null)); + nodes.add(new PEWorkflowDAG.Node(1L, "1")); + nodes.add(new PEWorkflowDAG.Node(2L, "2")); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 1L)); System.out.println(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges))); @@ -35,10 +35,10 @@ public class DAGTest { List nodes2 = Lists.newLinkedList(); List edges2 = Lists.newLinkedList(); - nodes2.add(new PEWorkflowDAG.Node(1L, "1", null, false, null)); - nodes2.add(new PEWorkflowDAG.Node(2L, "2", null, false, null)); - nodes2.add(new PEWorkflowDAG.Node(3L, "3", null, false, null)); - nodes2.add(new PEWorkflowDAG.Node(4L, "4", null, false, null)); + nodes2.add(new PEWorkflowDAG.Node(1L, "1")); + nodes2.add(new PEWorkflowDAG.Node(2L, "2")); + nodes2.add(new PEWorkflowDAG.Node(3L, "3")); + nodes2.add(new PEWorkflowDAG.Node(4L, "4")); edges2.add(new PEWorkflowDAG.Edge(1L, 2L)); edges2.add(new PEWorkflowDAG.Edge(1L, 3L)); edges2.add(new PEWorkflowDAG.Edge(2L, 4L)); @@ -66,10 +66,10 @@ public class DAGTest { List nodes3 = Lists.newLinkedList(); List edges3 = Lists.newLinkedList(); - nodes3.add(new PEWorkflowDAG.Node(1L, "1", null, false, null)); - nodes3.add(new PEWorkflowDAG.Node(2L, "2", null, false, null)); - nodes3.add(new PEWorkflowDAG.Node(3L, "3", null, false, null)); - nodes3.add(new PEWorkflowDAG.Node(4L, "4", null, false, null)); + nodes3.add(new PEWorkflowDAG.Node(1L, "1")); + nodes3.add(new PEWorkflowDAG.Node(2L, "2")); + nodes3.add(new PEWorkflowDAG.Node(3L, "3")); + nodes3.add(new PEWorkflowDAG.Node(4L, "4")); edges3.add(new PEWorkflowDAG.Edge(1L, 3L)); edges3.add(new PEWorkflowDAG.Edge(2L, 4L)); diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java new file mode 100644 index 00000000..cafd35d1 --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java @@ -0,0 +1,35 @@ +package com.github.kfcfans.oms.samples.workflow; + +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; +import com.github.kfcfans.oms.worker.log.OmsLogger; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 工作流测试 + * + * @author tjq + * @since 2020/6/2 + */ +@Component +public class WorkflowStandaloneProcessor implements BasicProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + OmsLogger logger = context.getOmsLogger(); + logger.info("current:" + JsonUtils.toJSONString(context)); + System.out.println("currentContext:"); + System.out.println(JsonUtils.toJSONString(context)); + + // 尝试获取上游任务 + Map upstreamTaskResult = context.fetchUpstreamTaskResult(); + System.out.println("工作流上游任务数据:"); + System.out.println(upstreamTaskResult); + + return new ProcessResult(true, context.getJobId() + " process successfully."); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index ada8d32d..4ee62557 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -155,7 +155,7 @@ public class ProcessorRunnable implements Runnable { try { processResult = processor.process(taskContext); }catch (Exception e) { - log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e); + log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e); processResult = new ProcessResult(false, e.toString()); } reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg())); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java index 68239fe6..81b3bd88 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java @@ -1,21 +1,27 @@ package com.github.kfcfans.oms.worker.core.processor; +import com.github.kfcfans.oms.common.utils.JsonUtils; import com.github.kfcfans.oms.worker.log.OmsLogger; +import com.google.common.collect.Maps; import lombok.Getter; import lombok.Setter; +import lombok.ToString; + +import java.util.Map; /** * 任务上下文 * 概念统一,所有的worker只处理Task,Job和JobInstance的概念只存在于Server和TaskTracker - * 单机任务 -> 整个Job变成一个Task - * 广播任务 -> 整个jOb变成一堆一样的Task - * MR 任务 -> 被map出来的任务都视为根Task的子Task + * 单机任务:整个Job变成一个Task + * 广播任务:整个job变成一堆一样的Task + * MR 任务:被map出来的任务都视为根Task的子Task * * @author tjq * @since 2020/3/18 */ @Getter @Setter +@ToString public class TaskContext { private Long jobId; @@ -29,7 +35,9 @@ public class TaskContext { */ private String jobParams; /** - * 通过 OpenAPI 传递的参数 + * 任务实例运行中参数 + * 若该任务实例通过 OpenAPI 触发,则该值为 OpenAPI 传递的参数 + * 若该任务为工作流的某个节点,则该值为上游任务传递下来的数据,推荐通过 {@link TaskContext#fetchUpstreamTaskResult()} 方法获取 */ private String instanceParams; /** @@ -49,17 +57,17 @@ public class TaskContext { */ private OmsLogger omsLogger; - public String getDescription() { - return "subInstanceId='" + subInstanceId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", taskId='" + taskId + '\'' + - ", taskName='" + taskName + '\'' + - ", jobParams='" + jobParams + '\'' + - ", instanceParams='" + instanceParams; - } - @Override - public String toString() { - return getDescription(); + /** + * 获取工作流上游任务传递的数据(仅该任务实例由工作流触发时存在) + * @return key: 上游任务的 jobId;value: 上游任务的 ProcessResult#result + */ + @SuppressWarnings("unchecked") + public Map fetchUpstreamTaskResult() { + try { + return (Map)JsonUtils.parseObject(instanceParams, Map.class); + }catch (Exception ignore) { + } + return Maps.newHashMap(); } }