From 897a493a5e9bc6b19b8bbf2d35889d24be393a30 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 26 May 2020 14:45:24 +0800 Subject: [PATCH] [dev] develop the dag converter --- ...{PLWorkflowDAG.java => PEWorkflowDAG.java} | 4 +- .../kfcfans/oms/common/model/WorkflowDAG.java | 6 +++ .../request/http/SaveWorkflowRequest.java | 4 +- .../oms/common/utils/WorkflowDAGUtils.java | 47 +++++++++++++++---- .../core/model/WorkflowInfoDO.java | 2 +- .../repository/WorkflowInfoRepository.java | 6 +++ ...leService.java => OmsScheduleService.java} | 30 +++++++++--- .../workflow/WorkflowInstanceManager.java | 36 ++++++++++++++ .../service/workflow/WorkflowService.java | 8 +++- .../web/ControllerExceptionHandler.java | 3 +- .../web/controller/WorkflowController.java | 2 +- 11 files changed, 126 insertions(+), 22 deletions(-) rename oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/{PLWorkflowDAG.java => PEWorkflowDAG.java} (92%) rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/{JobScheduleService.java => OmsScheduleService.java} (91%) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java similarity index 92% rename from oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java index 891d35b3..894de996 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java @@ -7,7 +7,7 @@ import lombok.NoArgsConstructor; import java.util.List; /** - * Point & Line DAG 表示法 + * Point & Edge DAG 表示法 * 点 + 线,易于表达和传播 * * @author tjq @@ -16,7 +16,7 @@ import java.util.List; @Data @NoArgsConstructor @AllArgsConstructor -public class PLWorkflowDAG { +public class PEWorkflowDAG { // DAG 图(点线表示法) private List nodes; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java index e69a06e7..b364a3dc 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.common.model; +import com.github.kfcfans.oms.common.InstanceStatus; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -28,5 +29,10 @@ public class WorkflowDAG { private List successors; private Long jobId; private String jobName; + + // 运行时参数 + private Long instanceId; + private InstanceStatus status; + private String result; } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java index a241d220..072eed82 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.common.request.http; -import com.github.kfcfans.oms.common.model.PLWorkflowDAG; +import com.github.kfcfans.oms.common.model.PEWorkflowDAG; import lombok.Data; /** @@ -21,7 +21,7 @@ public class SaveWorkflowRequest { private Long appId; // 点线表示法 - private PLWorkflowDAG plWorkflowDAG; + private PEWorkflowDAG pEWorkflowDAG; /* ************************** 定时参数 ************************** */ // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java index e7508244..7233c1a5 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java @@ -1,7 +1,7 @@ package com.github.kfcfans.oms.common.utils; import com.github.kfcfans.oms.common.OmsException; -import com.github.kfcfans.oms.common.model.PLWorkflowDAG; +import com.github.kfcfans.oms.common.model.PEWorkflowDAG; import com.github.kfcfans.oms.common.model.WorkflowDAG; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -20,21 +20,21 @@ public class WorkflowDAGUtils { /** * 将点线表示法的DAG图转化为引用表达法的DAG图 - * @param plWorkflowDAG 点线表示法的DAG图 + * @param PEWorkflowDAG 点线表示法的DAG图 * @return 引用表示法的DAG图 */ - public static WorkflowDAG convert(PLWorkflowDAG plWorkflowDAG) { + public static WorkflowDAG convert(PEWorkflowDAG PEWorkflowDAG) { Set rootIds = Sets.newHashSet(); Map id2Node = Maps.newHashMap(); - if (plWorkflowDAG.getNodes() == null || plWorkflowDAG.getNodes().isEmpty()) { + if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) { throw new OmsException("empty graph"); } // 创建节点 - plWorkflowDAG.getNodes().forEach(node -> { + PEWorkflowDAG.getNodes().forEach(node -> { Long jobId = node.getJobId(); - WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName()); + WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, null, null); id2Node.put(jobId, n); // 初始阶段,每一个点都设为顶点 @@ -42,7 +42,7 @@ public class WorkflowDAGUtils { }); // 连接图像 - plWorkflowDAG.getEdges().forEach(edge -> { + PEWorkflowDAG.getEdges().forEach(edge -> { WorkflowDAG.Node from = id2Node.get(edge.getFrom()); WorkflowDAG.Node to = id2Node.get(edge.getTo()); @@ -58,10 +58,41 @@ public class WorkflowDAGUtils { // 合法性校验 if (rootIds.size() != 1) { - throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(plWorkflowDAG)); + throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(PEWorkflowDAG)); } return new WorkflowDAG(id2Node.get(rootIds.iterator().next())); } + /** + * 校验 DAG 是否有效 + * @param peWorkflowDAG 点线表示法的 DAG 图 + * @return true/false + */ + public static boolean valid(PEWorkflowDAG peWorkflowDAG) { + try { + WorkflowDAG workflowDAG = convert(peWorkflowDAG); + return check(workflowDAG.getRoot(), Sets.newHashSet()); + }catch (Exception ignore) { + } + return false; + } + + private static boolean check(WorkflowDAG.Node root, Set ids) { + + // 递归出口(出现之前的节点则代表有环,失败;出现无后继者节点,则说明该路径成功) + if (ids.contains(root.getJobId())) { + return false; + } + if (root.getSuccessors().isEmpty()) { + return true; + } + ids.add(root.getJobId()); + for (WorkflowDAG.Node node : root.getSuccessors()) { + if (!check(node, Sets.newHashSet(ids))) { + return false; + } + } + return true; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java index 74f68e2e..0973cc11 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java @@ -31,7 +31,7 @@ public class WorkflowInfoDO { private Long appId; // 工作流的DAG图信息(点线式DAG的json) - private String plDAG; + private String peDAG; /* ************************** 定时参数 ************************** */ // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java index 2854c5d3..0978050d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java @@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.core.repository; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import org.springframework.data.jpa.repository.JpaRepository; +import java.util.List; + /** * DAG 工作流 数据操作层 * @@ -10,4 +12,8 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/5/26 */ public interface WorkflowInfoRepository extends JpaRepository { + + + List findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java similarity index 91% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java index 53e83149..04fdb9a6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java @@ -1,20 +1,22 @@ package com.github.kfcfans.oms.server.service.timing.schedule; import com.github.kfcfans.oms.common.InstanceStatus; -import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.common.TimeExpressionType; -import com.github.kfcfans.oms.server.common.utils.CronExpression; -import com.github.kfcfans.oms.server.service.JobService; import com.github.kfcfans.oms.server.akka.OhMyServer; +import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository; -import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; +import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.oms.server.service.DispatchService; -import com.github.kfcfans.oms.server.service.id.IdGenerateService; +import com.github.kfcfans.oms.server.service.JobService; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; +import com.github.kfcfans.oms.server.service.id.IdGenerateService; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -43,7 +45,7 @@ import java.util.stream.Collectors; */ @Slf4j @Service -public class JobScheduleService { +public class OmsScheduleService { private static final int MAX_BATCH_NUM = 10; @@ -56,6 +58,8 @@ public class JobScheduleService { @Resource private JobInfoRepository jobInfoRepository; @Resource + private WorkflowInfoRepository workflowInfoRepository; + @Resource private InstanceInfoRepository instanceInfoRepository; @Resource @@ -188,6 +192,19 @@ public class JobScheduleService { }); } + private void scheduleWorkflow(List appIds) { + + long nowTime = System.currentTimeMillis(); + long timeThreshold = nowTime + 2 * SCHEDULE_RATE; + Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { + List wfInfos = workflowInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + + if (CollectionUtils.isEmpty(wfInfos)) { + return; + } + }); + } + private void scheduleFrequentJob(List appIds) { Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { @@ -216,4 +233,5 @@ public class JobScheduleService { } }); } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java new file mode 100644 index 00000000..afacccb2 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -0,0 +1,36 @@ +package com.github.kfcfans.oms.server.service.workflow; + +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; +import com.github.kfcfans.oms.server.service.DispatchService; +import com.google.common.collect.Maps; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Map; + +/** + * 管理运行中的工作流实例 + * + * @author tjq + * @since 2020/5/26 + */ +@Service +public class WorkflowInstanceManager { + + @Resource + private DispatchService dispatchService; + + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private WorkflowInfoRepository workflowInfoRepository; + + + private final Map wfId2Info = Maps.newConcurrentMap(); + + public void submit(WorkflowInfoDO wfInfo) { + wfId2Info.put(wfInfo.getId(), wfInfo); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java index 0c6a1432..81d48555 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java @@ -1,8 +1,10 @@ package com.github.kfcfans.oms.server.service.workflow; +import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; @@ -32,6 +34,10 @@ public class WorkflowService { */ public Long saveWorkflow(SaveWorkflowRequest req) throws Exception { + if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) { + throw new OmsException("illegal DAG"); + } + Long wfId = req.getId(); WorkflowInfoDO wf; if (wfId == null) { @@ -43,7 +49,7 @@ public class WorkflowService { BeanUtils.copyProperties(req, wf); wf.setGmtModified(new Date()); - wf.setPlDAG(JsonUtils.toJSONString(req.getPlWorkflowDAG())); + wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG())); // 计算 NextTriggerTime TimeExpressionType timeExpressionType = TimeExpressionType.of(req.getTimeExpressionType()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java index 5abb3a63..c4c52ab4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.web; +import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.response.ResultDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.ControllerAdvice; @@ -21,7 +22,7 @@ public class ControllerExceptionHandler { public ResultDTO exceptionHandler(Exception e) { // 不是所有异常都需要打印完整堆栈,后续可以定义内部的Exception,便于判断 - if (e instanceof IllegalArgumentException) { + if (e instanceof IllegalArgumentException || e instanceof OmsException) { log.warn("[ControllerException] http request failed, message is {}.", e.getMessage()); }else { log.error("[ControllerException] http request failed.", e); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java index c7998276..43c8c85b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java @@ -24,7 +24,7 @@ public class WorkflowController { private WorkflowService workflowService; @PostMapping("/save") - public ResultDTO save(@RequestBody SaveWorkflowRequest req) { + public ResultDTO save(@RequestBody SaveWorkflowRequest req) throws Exception { return ResultDTO.success(workflowService.saveWorkflow(req)); }