From 5a7af55cef258794bc08674c1e763b2d8f572019 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 26 May 2020 12:06:03 +0800 Subject: [PATCH] [dev] design the DAG model --- .../github/kfcfans/oms/client/OhMyClient.java | 5 +- .../oms/client/OmsOpenApiException.java | 29 -------- .../kfcfans/oms/common/OmsException.java | 29 ++++++++ .../oms/common/model/PLWorkflowDAG.java | 42 ++++++++++++ .../kfcfans/oms/common/model/WorkflowDAG.java | 32 +++++++++ .../request/http/SaveWorkflowRequest.java | 39 +++++++++++ .../oms/common/utils/WorkflowDAGUtils.java | 67 +++++++++++++++++++ .../core/model/WorkflowInfoDO.java | 52 ++++++++++++++ .../repository/WorkflowInfoRepository.java | 13 ++++ .../service/workflow/WorkflowService.java | 60 +++++++++++++++++ .../web/controller/WorkflowController.java | 31 +++++++++ .../oms/worker/common/OmsWorkerException.java | 29 -------- .../oms/worker/container/OmsJarContainer.java | 6 +- 13 files changed, 371 insertions(+), 63 deletions(-) delete mode 100644 oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OmsOpenApiException.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsException.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java delete mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OmsWorkerException.java diff --git a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java index eed52df1..1d061784 100644 --- a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java +++ b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.client; import com.github.kfcfans.oms.common.InstanceStatus; +import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.OpenAPIConstant; import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.oms.common.response.InstanceInfoDTO; @@ -72,7 +73,7 @@ public class OhMyClient { } if (StringUtils.isEmpty(currentAddress)) { - throw new OmsOpenApiException("no server available"); + throw new OmsException("no server available"); } log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName); } @@ -250,6 +251,6 @@ public class OhMyClient { } log.error("[OhMyClient] no server available in {}.", allAddress); - throw new OmsOpenApiException("no server available"); + throw new OmsException("no server available"); } } diff --git a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OmsOpenApiException.java b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OmsOpenApiException.java deleted file mode 100644 index 02389c9f..00000000 --- a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OmsOpenApiException.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.kfcfans.oms.client; - -/** - * 异常 - * - * @author tjq - * @since 2020/5/14 - */ -public class OmsOpenApiException extends RuntimeException { - - public OmsOpenApiException() { - } - - public OmsOpenApiException(String message) { - super(message); - } - - public OmsOpenApiException(String message, Throwable cause) { - super(message, cause); - } - - public OmsOpenApiException(Throwable cause) { - super(cause); - } - - public OmsOpenApiException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsException.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsException.java new file mode 100644 index 00000000..f0ca2519 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsException.java @@ -0,0 +1,29 @@ +package com.github.kfcfans.oms.common; + +/** + * OhMyScheduler 运行时异常 + * + * @author tjq + * @since 2020/5/26 + */ +public class OmsException extends RuntimeException { + + public OmsException() { + } + + public OmsException(String message) { + super(message); + } + + public OmsException(String message, Throwable cause) { + super(message, cause); + } + + public OmsException(Throwable cause) { + super(cause); + } + + public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java new file mode 100644 index 00000000..891d35b3 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PLWorkflowDAG.java @@ -0,0 +1,42 @@ +package com.github.kfcfans.oms.common.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * Point & Line DAG 表示法 + * 点 + 线,易于表达和传播 + * + * @author tjq + * @since 2020/5/26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PLWorkflowDAG { + + // DAG 图(点线表示法) + private List nodes; + private List edges; + + // 点 + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Node { + private Long jobId; + private String jobName; + } + + // 边 jobId -> jobId + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Edge { + private Long from; + private Long to; + } +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java new file mode 100644 index 00000000..e69a06e7 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/WorkflowDAG.java @@ -0,0 +1,32 @@ +package com.github.kfcfans.oms.common.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * DAG 工作流对象 + * 使用引用,易于计算 + * + * @author tjq + * @since 2020/5/26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowDAG { + + private Node root; + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static final class Node { + // 后继者,子节点 + private List successors; + private Long jobId; + private String jobName; + } +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java new file mode 100644 index 00000000..a241d220 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java @@ -0,0 +1,39 @@ +package com.github.kfcfans.oms.common.request.http; + +import com.github.kfcfans.oms.common.model.PLWorkflowDAG; +import lombok.Data; + +/** + * 创建/修改 Workflow 请求 + * + * @author tjq + * @since 2020/5/26 + */ +@Data +public class SaveWorkflowRequest { + + private Long id; + + private String wfName; + private String wfDescription; + + // 所属应用ID + private Long appId; + + // 点线表示法 + private PLWorkflowDAG plWorkflowDAG; + + /* ************************** 定时参数 ************************** */ + // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + private Integer timeExpressionType; + // 时间表达式,CRON/NULL/LONG/LONG + private String timeExpression; + + // 1 正常运行,2 停止(不再调度) + private Integer status; + + // 工作流整体失败的报警 + private String notifyUserIds; + + +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java new file mode 100644 index 00000000..e7508244 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java @@ -0,0 +1,67 @@ +package com.github.kfcfans.oms.common.utils; + +import com.github.kfcfans.oms.common.OmsException; +import com.github.kfcfans.oms.common.model.PLWorkflowDAG; +import com.github.kfcfans.oms.common.model.WorkflowDAG; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.Map; +import java.util.Set; + +/** + * DAG 工具类 + * + * @author tjq + * @since 2020/5/26 + */ +public class WorkflowDAGUtils { + + /** + * 将点线表示法的DAG图转化为引用表达法的DAG图 + * @param plWorkflowDAG 点线表示法的DAG图 + * @return 引用表示法的DAG图 + */ + public static WorkflowDAG convert(PLWorkflowDAG plWorkflowDAG) { + Set rootIds = Sets.newHashSet(); + Map id2Node = Maps.newHashMap(); + + if (plWorkflowDAG.getNodes() == null || plWorkflowDAG.getNodes().isEmpty()) { + throw new OmsException("empty graph"); + } + + // 创建节点 + plWorkflowDAG.getNodes().forEach(node -> { + Long jobId = node.getJobId(); + WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName()); + id2Node.put(jobId, n); + + // 初始阶段,每一个点都设为顶点 + rootIds.add(jobId); + }); + + // 连接图像 + plWorkflowDAG.getEdges().forEach(edge -> { + WorkflowDAG.Node from = id2Node.get(edge.getFrom()); + WorkflowDAG.Node to = id2Node.get(edge.getTo()); + + if (from == null || to == null) { + throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge)); + } + + from.getSuccessors().add(to); + + // 被连接的点不可能成为 root,移除 + rootIds.remove(to.getJobId()); + }); + + // 合法性校验 + if (rootIds.size() != 1) { + throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(plWorkflowDAG)); + } + + return new WorkflowDAG(id2Node.get(rootIds.iterator().next())); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java new file mode 100644 index 00000000..74f68e2e --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInfoDO.java @@ -0,0 +1,52 @@ +package com.github.kfcfans.oms.server.persistence.core.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.persistence.*; +import java.util.Date; + +/** + * DAG 工作流信息表 + * + * @author tjq + * @since 2020/5/26 + */ +@Data +@Entity +@NoArgsConstructor +@AllArgsConstructor +@Table(name = "workflow_info", indexes = {@Index(columnList = "appId")}) +public class WorkflowInfoDO { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + private String wfName; + private String wfDescription; + + // 所属应用ID + private Long appId; + + // 工作流的DAG图信息(点线式DAG的json) + private String plDAG; + + /* ************************** 定时参数 ************************** */ + // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + private Integer timeExpressionType; + // 时间表达式,CRON/NULL/LONG/LONG + private String timeExpression; + + // 1 正常运行,2 停止(不再调度) + private Integer status; + // 下一次调度时间 + private Long nextTriggerTime; + + // 工作流整体失败的报警 + private String notifyUserIds; + + private Date gmtCreate; + private Date gmtModified; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java new file mode 100644 index 00000000..2854c5d3 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java @@ -0,0 +1,13 @@ +package com.github.kfcfans.oms.server.persistence.core.repository; + +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; +import org.springframework.data.jpa.repository.JpaRepository; + +/** + * DAG 工作流 数据操作层 + * + * @author tjq + * @since 2020/5/26 + */ +public interface WorkflowInfoRepository extends JpaRepository { +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java new file mode 100644 index 00000000..0c6a1432 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java @@ -0,0 +1,60 @@ +package com.github.kfcfans.oms.server.service.workflow; + +import com.github.kfcfans.oms.common.TimeExpressionType; +import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.server.common.utils.CronExpression; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Date; + +/** + * Workflow 服务 + * + * @author tjq + * @since 2020/5/26 + */ +@Service +public class WorkflowService { + + @Resource + private WorkflowInfoRepository workflowInfoRepository; + + /** + * 保存/修改DAG工作流 + * @param req 请求 + * @return 工作流ID + * @throws Exception 异常 + */ + public Long saveWorkflow(SaveWorkflowRequest req) throws Exception { + + Long wfId = req.getId(); + WorkflowInfoDO wf; + if (wfId == null) { + wf = new WorkflowInfoDO(); + wf.setGmtCreate(new Date()); + }else { + wf = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id:" + wfId)); + } + + BeanUtils.copyProperties(req, wf); + wf.setGmtModified(new Date()); + wf.setPlDAG(JsonUtils.toJSONString(req.getPlWorkflowDAG())); + + // 计算 NextTriggerTime + TimeExpressionType timeExpressionType = TimeExpressionType.of(req.getTimeExpressionType()); + if (timeExpressionType == TimeExpressionType.CRON) { + CronExpression cronExpression = new CronExpression(req.getTimeExpression()); + Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); + wf.setNextTriggerTime(nextValidTime.getTime()); + } + + WorkflowInfoDO newEntity = workflowInfoRepository.saveAndFlush(wf); + return newEntity.getId(); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java new file mode 100644 index 00000000..c7998276 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java @@ -0,0 +1,31 @@ +package com.github.kfcfans.oms.server.web.controller; + +import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.oms.common.response.ResultDTO; +import com.github.kfcfans.oms.server.service.workflow.WorkflowService; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * 工作流控制器 + * + * @author tjq + * @since 2020/5/26 + */ +@RestController +@RequestMapping("/workflow") +public class WorkflowController { + + @Resource + private WorkflowService workflowService; + + @PostMapping("/save") + public ResultDTO save(@RequestBody SaveWorkflowRequest req) { + return ResultDTO.success(workflowService.saveWorkflow(req)); + } + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OmsWorkerException.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OmsWorkerException.java deleted file mode 100644 index f042ac35..00000000 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OmsWorkerException.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.kfcfans.oms.worker.common; - -/** - * 异常 - * - * @author tjq - * @since 2020/5/16 - */ -public class OmsWorkerException extends RuntimeException { - - public OmsWorkerException() { - } - - public OmsWorkerException(String message) { - super(message); - } - - public OmsWorkerException(String message, Throwable cause) { - super(message, cause); - } - - public OmsWorkerException(Throwable cause) { - super(cause); - } - - public OmsWorkerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java index b55156d2..7e6eec54 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java @@ -1,7 +1,7 @@ package com.github.kfcfans.oms.worker.container; import com.github.kfcfans.oms.common.ContainerConstant; -import com.github.kfcfans.oms.worker.common.OmsWorkerException; +import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; @@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer { if (propertiesURLStream == null) { log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); - throw new OmsWorkerException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); + throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME); } properties.load(propertiesURLStream); @@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer { String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY); if (StringUtils.isEmpty(packageName)) { log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId); - throw new OmsWorkerException("invalid jar file"); + throw new OmsException("invalid jar file"); } // 加载用户类