diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java similarity index 84% rename from oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java index c1ef4ac9..b7503bde 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java @@ -1,8 +1,9 @@ -package com.github.kfcfans.oms.common.utils; +package com.github.kfcfans.oms.server.common.utils; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; -import com.github.kfcfans.oms.common.model.WorkflowDAG; +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.server.model.WorkflowDAG; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; @@ -59,12 +60,14 @@ public class WorkflowDAGUtils { rootIds.remove(to.getJobId()); }); - // 合法性校验 - if (rootIds.size() != 1) { + // 合法性校验(至少存在一个顶点) + if (rootIds.size() < 1) { throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(PEWorkflowDAG)); } - return new WorkflowDAG(id2Node.get(rootIds.iterator().next())); + List roots = Lists.newLinkedList(); + rootIds.forEach(id -> roots.add(id2Node.get(id))); + return new WorkflowDAG(roots); } /** @@ -78,7 +81,7 @@ public class WorkflowDAGUtils { List edges = Lists.newLinkedList(); Queue queue = Queues.newLinkedBlockingQueue(); - queue.add(dag.getRoot()); + queue.addAll(dag.getRoots()); while (!queue.isEmpty()) { WorkflowDAG.Node node = queue.poll(); @@ -105,7 +108,14 @@ public class WorkflowDAGUtils { public static boolean valid(PEWorkflowDAG peWorkflowDAG) { try { WorkflowDAG workflowDAG = convert(peWorkflowDAG); - return check(workflowDAG.getRoot(), Sets.newHashSet()); + + // 检查所有顶点的路径 + for (WorkflowDAG.Node root : workflowDAG.getRoots()) { + if (!check(root, Sets.newHashSet())) { + return false; + } + } + return true; }catch (Exception ignore) { } return false; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java similarity index 85% rename from oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java index a438963b..e9a8d94f 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.common.model; +package com.github.kfcfans.oms.server.model; import lombok.AllArgsConstructor; import lombok.Data; @@ -18,7 +18,8 @@ import java.util.List; @AllArgsConstructor public class WorkflowDAG { - private Node root; + // DAG允许存在多个顶点 + private List roots; @Data @NoArgsConstructor diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java index 6464e827..5a2802dc 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java @@ -32,6 +32,8 @@ public class InstanceInfoDO { // 任务实例ID private Long instanceId; // 任务实例参数 + @Lob + @Column(columnDefinition="TEXT") private String instanceParams; // 该任务实例的类型,普通/工作流(InstanceType) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index 684facd5..d5ceec9a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.service.ha; import akka.actor.ActorSelection; import akka.pattern.Patterns; +import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.response.AskResponse; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.requests.Ping; @@ -55,7 +56,7 @@ public class ServerSelectService { // 无锁获取当前数据库中的Server Optional appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { - throw new RuntimeException(appId + " is not registered!"); + throw new OmsException(appId + " is not registered!"); } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index c6fcae1e..a87899da 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -6,8 +6,8 @@ import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.WorkflowInstanceStatus; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; -import com.github.kfcfans.oms.common.model.WorkflowDAG; -import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.model.WorkflowDAG; +import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; @@ -121,11 +121,12 @@ public class WorkflowInstanceManager { WorkflowDAG workflowDAG = WorkflowDAGUtils.convert(JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class)); // 运行根任务,无法找到根任务则直接失败 - WorkflowDAG.Node root = workflowDAG.getRoot(); - - // 创建根任务实例 - Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); - root.setInstanceId(instanceId); + List roots = workflowDAG.getRoots(); + // 创建所有的根任务 + roots.forEach(root -> { + Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis()); + root.setInstanceId(instanceId); + }); // 持久化 wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); @@ -134,7 +135,7 @@ public class WorkflowInstanceManager { log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - runInstance(root.getJobId(), instanceId, wfInstanceId, null); + roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null)); }catch (Exception e) { wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV()); @@ -164,8 +165,6 @@ public class WorkflowInstanceManager { WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); Long wfId = wfInstance.getWorkflowId(); - log.debug("[Workflow-{}] one task in dag finished, wfInstanceId={},instanceId={},success={},result={}", wfId, wfInstanceId, instanceId, success, result); - try { WorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class); @@ -175,7 +174,7 @@ public class WorkflowInstanceManager { // 层序遍历 DAG,更新完成节点的状态 Queue queue = Queues.newLinkedBlockingQueue(); - queue.add(dag.getRoot()); + queue.addAll(dag.getRoots()); while (!queue.isEmpty()) { WorkflowDAG.Node head = queue.poll(); if (instanceId.equals(head.getInstanceId())) { @@ -209,12 +208,16 @@ public class WorkflowInstanceManager { AtomicBoolean allFinished = new AtomicBoolean(true); relyMap.keySet().forEach(jobId -> { - // 如果该任务本身已经完成,不需要再计算,直接跳过 + // 无需计算已完成节点 if (jobId2Node.get(jobId).isFinished()) { return; } - allFinished.set(false); + + // 存在 instanceId,代表任务已派发过,无需再次计算 + if (jobId2Node.get(jobId).getInstanceId() != null) { + return; + } // 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行 for (Long reliedJobId : relyMap.get(jobId)) { if (!jobId2Node.get(reliedJobId).isFinished()) { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java index 325e3c5a..57fbb113 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.WorkflowInstanceStatus; -import com.github.kfcfans.oms.common.model.WorkflowDAG; +import com.github.kfcfans.oms.server.model.WorkflowDAG; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.oms.server.service.instance.InstanceService; @@ -44,7 +44,7 @@ public class WorkflowInstanceService { throw new OmsException("Permission Denied!"); } if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { - throw new OmsException("already stopped"); + throw new OmsException("workflow instance already stopped"); } // 修改数据库状态 @@ -56,7 +56,7 @@ public class WorkflowInstanceService { // 停止所有已启动且未完成的服务 WorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class); Queue queue = Queues.newLinkedBlockingQueue(); - queue.add(workflowDAG.getRoot()); + queue.addAll(workflowDAG.getRoots()); while (!queue.isEmpty()) { WorkflowDAG.Node node = queue.poll(); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java index 5afd75b4..79f707bc 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; -import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.oms.server.common.SJ; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java index 605e3992..e6050fa9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java @@ -1,8 +1,8 @@ package com.github.kfcfans.oms.server.web.response; +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; -import com.github.kfcfans.oms.common.utils.JsonUtils; import com.github.kfcfans.oms.server.common.SJ; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; @@ -54,7 +54,7 @@ public class WorkflowInfoVO { vo.enable = SwitchableStatus.of(wfDO.getStatus()) == SwitchableStatus.ENABLE; vo.setTimeExpressionType(TimeExpressionType.of(wfDO.getTimeExpressionType()).name()); - vo.setPEWorkflowDAG(JsonUtils.parseObjectUnsafe(wfDO.getPeDAG(), PEWorkflowDAG.class)); + vo.setPEWorkflowDAG(JSONObject.parseObject(wfDO.getPeDAG(), PEWorkflowDAG.class)); if (!StringUtils.isEmpty(wfDO.getNotifyUserIds())) { vo.setNotifyUserIds(SJ.commaSplitter.splitToList(wfDO.getNotifyUserIds()).stream().map(Long::valueOf).collect(Collectors.toList())); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java index 743b0dca..f5959428 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java @@ -1,11 +1,11 @@ package com.github.kfcfans.oms.server.web.response; +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.OmsConstant; import com.github.kfcfans.oms.common.WorkflowInstanceStatus; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; -import com.github.kfcfans.oms.common.model.WorkflowDAG; -import com.github.kfcfans.oms.common.utils.JsonUtils; -import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.model.WorkflowDAG; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; import lombok.Data; import org.apache.commons.lang3.time.DateFormatUtils; @@ -45,7 +45,7 @@ public class WorkflowInstanceInfoVO { vo.setWorkflowName(workflowName); vo.setStatusStr(WorkflowInstanceStatus.of(wfInstanceDO.getStatus()).getDes()); - vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JsonUtils.parseObjectUnsafe(wfInstanceDO.getDag(), WorkflowDAG.class))); + vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JSONObject.parseObject(wfInstanceDO.getDag(), WorkflowDAG.class))); // JS精度丢失问题 vo.setWfInstanceId(String.valueOf(wfInstanceDO.getWfInstanceId())); diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java index fa4dc844..a1d45db7 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java @@ -2,9 +2,9 @@ package com.github.kfcfans.oms.server.test; import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; -import com.github.kfcfans.oms.common.model.WorkflowDAG; +import com.github.kfcfans.oms.server.model.WorkflowDAG; import com.github.kfcfans.oms.common.utils.JsonUtils; -import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils; import com.google.common.collect.Lists; import org.junit.Test; @@ -61,6 +61,23 @@ public class DAGTest { // 打断点看 reference 关系 System.out.println(wfDAGByJackSon); System.out.println(wfDAGByFastJSON); + + // 测试图三(双顶点) 1 -> 3, 2 -> 4 + List nodes3 = Lists.newLinkedList(); + List edges3 = Lists.newLinkedList(); + + nodes3.add(new PEWorkflowDAG.Node(1L, "1", null, false, null)); + nodes3.add(new PEWorkflowDAG.Node(2L, "2", null, false, null)); + nodes3.add(new PEWorkflowDAG.Node(3L, "3", null, false, null)); + nodes3.add(new PEWorkflowDAG.Node(4L, "4", null, false, null)); + edges3.add(new PEWorkflowDAG.Edge(1L, 3L)); + edges3.add(new PEWorkflowDAG.Edge(2L, 4L)); + + PEWorkflowDAG multiRootPEDAG = new PEWorkflowDAG(nodes3, edges3); + System.out.println(WorkflowDAGUtils.valid(multiRootPEDAG)); + WorkflowDAG multiRootDAG = WorkflowDAGUtils.convert(multiRootPEDAG); + System.out.println(multiRootDAG); + }