[dev] design the DAG model

This commit is contained in:
tjq 2020-05-26 12:06:03 +08:00
parent 8db914939b
commit 5a7af55cef
13 changed files with 371 additions and 63 deletions

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.client;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.OpenAPIConstant;
import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.oms.common.response.InstanceInfoDTO;
@ -72,7 +73,7 @@ public class OhMyClient {
}
if (StringUtils.isEmpty(currentAddress)) {
throw new OmsOpenApiException("no server available");
throw new OmsException("no server available");
}
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
}
@ -250,6 +251,6 @@ public class OhMyClient {
}
log.error("[OhMyClient] no server available in {}.", allAddress);
throw new OmsOpenApiException("no server available");
throw new OmsException("no server available");
}
}

View File

@ -1,29 +0,0 @@
package com.github.kfcfans.oms.client;
/**
* 异常
*
* @author tjq
* @since 2020/5/14
*/
public class OmsOpenApiException extends RuntimeException {
public OmsOpenApiException() {
}
public OmsOpenApiException(String message) {
super(message);
}
public OmsOpenApiException(String message, Throwable cause) {
super(message, cause);
}
public OmsOpenApiException(Throwable cause) {
super(cause);
}
public OmsOpenApiException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.oms.common;
/**
* OhMyScheduler 运行时异常
*
* @author tjq
* @since 2020/5/26
*/
public class OmsException extends RuntimeException {
public OmsException() {
}
public OmsException(String message) {
super(message);
}
public OmsException(String message, Throwable cause) {
super(message, cause);
}
public OmsException(Throwable cause) {
super(cause);
}
public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,42 @@
package com.github.kfcfans.oms.common.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* Point & Line DAG 表示法
* + 线易于表达和传播
*
* @author tjq
* @since 2020/5/26
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PLWorkflowDAG {
// DAG 点线表示法
private List<Node> nodes;
private List<Edge> edges;
//
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Node {
private Long jobId;
private String jobName;
}
// jobId -> jobId
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Edge {
private Long from;
private Long to;
}
}

View File

@ -0,0 +1,32 @@
package com.github.kfcfans.oms.common.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* DAG 工作流对象
* 使用引用易于计算
*
* @author tjq
* @since 2020/5/26
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowDAG {
private Node root;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static final class Node {
// 后继者子节点
private List<Node> successors;
private Long jobId;
private String jobName;
}
}

View File

@ -0,0 +1,39 @@
package com.github.kfcfans.oms.common.request.http;
import com.github.kfcfans.oms.common.model.PLWorkflowDAG;
import lombok.Data;
/**
* 创建/修改 Workflow 请求
*
* @author tjq
* @since 2020/5/26
*/
@Data
public class SaveWorkflowRequest {
private Long id;
private String wfName;
private String wfDescription;
// 所属应用ID
private Long appId;
// 点线表示法
private PLWorkflowDAG plWorkflowDAG;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
// 1 正常运行2 停止不再调度
private Integer status;
// 工作流整体失败的报警
private String notifyUserIds;
}

View File

@ -0,0 +1,67 @@
package com.github.kfcfans.oms.common.utils;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.model.PLWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
/**
* DAG 工具类
*
* @author tjq
* @since 2020/5/26
*/
public class WorkflowDAGUtils {
/**
* 将点线表示法的DAG图转化为引用表达法的DAG图
* @param plWorkflowDAG 点线表示法的DAG图
* @return 引用表示法的DAG图
*/
public static WorkflowDAG convert(PLWorkflowDAG plWorkflowDAG) {
Set<Long> rootIds = Sets.newHashSet();
Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap();
if (plWorkflowDAG.getNodes() == null || plWorkflowDAG.getNodes().isEmpty()) {
throw new OmsException("empty graph");
}
// 创建节点
plWorkflowDAG.getNodes().forEach(node -> {
Long jobId = node.getJobId();
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName());
id2Node.put(jobId, n);
// 初始阶段每一个点都设为顶点
rootIds.add(jobId);
});
// 连接图像
plWorkflowDAG.getEdges().forEach(edge -> {
WorkflowDAG.Node from = id2Node.get(edge.getFrom());
WorkflowDAG.Node to = id2Node.get(edge.getTo());
if (from == null || to == null) {
throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge));
}
from.getSuccessors().add(to);
// 被连接的点不可能成为 root移除
rootIds.remove(to.getJobId());
});
// 合法性校验
if (rootIds.size() != 1) {
throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(plWorkflowDAG));
}
return new WorkflowDAG(id2Node.get(rootIds.iterator().next()));
}
}

View File

@ -0,0 +1,52 @@
package com.github.kfcfans.oms.server.persistence.core.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.util.Date;
/**
* DAG 工作流信息表
*
* @author tjq
* @since 2020/5/26
*/
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "workflow_info", indexes = {@Index(columnList = "appId")})
public class WorkflowInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String wfName;
private String wfDescription;
// 所属应用ID
private Long appId;
// 工作流的DAG图信息点线式DAG的json
private String plDAG;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
// 1 正常运行2 停止不再调度
private Integer status;
// 下一次调度时间
private Long nextTriggerTime;
// 工作流整体失败的报警
private String notifyUserIds;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -0,0 +1,13 @@
package com.github.kfcfans.oms.server.persistence.core.repository;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* DAG 工作流 数据操作层
*
* @author tjq
* @since 2020/5/26
*/
public interface WorkflowInfoRepository extends JpaRepository<WorkflowInfoDO, Long> {
}

View File

@ -0,0 +1,60 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
/**
* Workflow 服务
*
* @author tjq
* @since 2020/5/26
*/
@Service
public class WorkflowService {
@Resource
private WorkflowInfoRepository workflowInfoRepository;
/**
* 保存/修改DAG工作流
* @param req 请求
* @return 工作流ID
* @throws Exception 异常
*/
public Long saveWorkflow(SaveWorkflowRequest req) throws Exception {
Long wfId = req.getId();
WorkflowInfoDO wf;
if (wfId == null) {
wf = new WorkflowInfoDO();
wf.setGmtCreate(new Date());
}else {
wf = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id:" + wfId));
}
BeanUtils.copyProperties(req, wf);
wf.setGmtModified(new Date());
wf.setPlDAG(JsonUtils.toJSONString(req.getPlWorkflowDAG()));
// 计算 NextTriggerTime
TimeExpressionType timeExpressionType = TimeExpressionType.of(req.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(req.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date());
wf.setNextTriggerTime(nextValidTime.getTime());
}
WorkflowInfoDO newEntity = workflowInfoRepository.saveAndFlush(wf);
return newEntity.getId();
}
}

View File

@ -0,0 +1,31 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.service.workflow.WorkflowService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 工作流控制器
*
* @author tjq
* @since 2020/5/26
*/
@RestController
@RequestMapping("/workflow")
public class WorkflowController {
@Resource
private WorkflowService workflowService;
@PostMapping("/save")
public ResultDTO<Long> save(@RequestBody SaveWorkflowRequest req) {
return ResultDTO.success(workflowService.saveWorkflow(req));
}
}

View File

@ -1,29 +0,0 @@
package com.github.kfcfans.oms.worker.common;
/**
* 异常
*
* @author tjq
* @since 2020/5/16
*/
public class OmsWorkerException extends RuntimeException {
public OmsWorkerException() {
}
public OmsWorkerException(String message) {
super(message);
}
public OmsWorkerException(String message, Throwable cause) {
super(message, cause);
}
public OmsWorkerException(Throwable cause) {
super(cause);
}
public OmsWorkerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.worker.container;
import com.github.kfcfans.oms.common.ContainerConstant;
import com.github.kfcfans.oms.worker.common.OmsWorkerException;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer {
if (propertiesURLStream == null) {
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
throw new OmsWorkerException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
}
properties.load(propertiesURLStream);
@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer {
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
if (StringUtils.isEmpty(packageName)) {
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
throw new OmsWorkerException("invalid jar file");
throw new OmsException("invalid jar file");
}
// 加载用户类