[dev] develop the dag converter

This commit is contained in:
tjq 2020-05-26 14:45:24 +08:00
parent 5a7af55cef
commit 897a493a5e
11 changed files with 126 additions and 22 deletions

View File

@ -7,7 +7,7 @@ import lombok.NoArgsConstructor;
import java.util.List;
/**
* Point & Line DAG 表示法
* Point & Edge DAG 表示法
* + 线易于表达和传播
*
* @author tjq
@ -16,7 +16,7 @@ import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PLWorkflowDAG {
public class PEWorkflowDAG {
// DAG 点线表示法
private List<Node> nodes;

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.common.model;
import com.github.kfcfans.oms.common.InstanceStatus;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -28,5 +29,10 @@ public class WorkflowDAG {
private List<Node> successors;
private Long jobId;
private String jobName;
// 运行时参数
private Long instanceId;
private InstanceStatus status;
private String result;
}
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.common.request.http;
import com.github.kfcfans.oms.common.model.PLWorkflowDAG;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import lombok.Data;
/**
@ -21,7 +21,7 @@ public class SaveWorkflowRequest {
private Long appId;
// 点线表示法
private PLWorkflowDAG plWorkflowDAG;
private PEWorkflowDAG pEWorkflowDAG;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.common.utils;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.model.PLWorkflowDAG;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -20,21 +20,21 @@ public class WorkflowDAGUtils {
/**
* 将点线表示法的DAG图转化为引用表达法的DAG图
* @param plWorkflowDAG 点线表示法的DAG图
* @param PEWorkflowDAG 点线表示法的DAG图
* @return 引用表示法的DAG图
*/
public static WorkflowDAG convert(PLWorkflowDAG plWorkflowDAG) {
public static WorkflowDAG convert(PEWorkflowDAG PEWorkflowDAG) {
Set<Long> rootIds = Sets.newHashSet();
Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap();
if (plWorkflowDAG.getNodes() == null || plWorkflowDAG.getNodes().isEmpty()) {
if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) {
throw new OmsException("empty graph");
}
// 创建节点
plWorkflowDAG.getNodes().forEach(node -> {
PEWorkflowDAG.getNodes().forEach(node -> {
Long jobId = node.getJobId();
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName());
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, null, null);
id2Node.put(jobId, n);
// 初始阶段每一个点都设为顶点
@ -42,7 +42,7 @@ public class WorkflowDAGUtils {
});
// 连接图像
plWorkflowDAG.getEdges().forEach(edge -> {
PEWorkflowDAG.getEdges().forEach(edge -> {
WorkflowDAG.Node from = id2Node.get(edge.getFrom());
WorkflowDAG.Node to = id2Node.get(edge.getTo());
@ -58,10 +58,41 @@ public class WorkflowDAGUtils {
// 合法性校验
if (rootIds.size() != 1) {
throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(plWorkflowDAG));
throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(PEWorkflowDAG));
}
return new WorkflowDAG(id2Node.get(rootIds.iterator().next()));
}
/**
* 校验 DAG 是否有效
* @param peWorkflowDAG 点线表示法的 DAG
* @return true/false
*/
public static boolean valid(PEWorkflowDAG peWorkflowDAG) {
try {
WorkflowDAG workflowDAG = convert(peWorkflowDAG);
return check(workflowDAG.getRoot(), Sets.newHashSet());
}catch (Exception ignore) {
}
return false;
}
private static boolean check(WorkflowDAG.Node root, Set<Long> ids) {
// 递归出口出现之前的节点则代表有环失败出现无后继者节点则说明该路径成功
if (ids.contains(root.getJobId())) {
return false;
}
if (root.getSuccessors().isEmpty()) {
return true;
}
ids.add(root.getJobId());
for (WorkflowDAG.Node node : root.getSuccessors()) {
if (!check(node, Sets.newHashSet(ids))) {
return false;
}
}
return true;
}
}

View File

@ -31,7 +31,7 @@ public class WorkflowInfoDO {
private Long appId;
// 工作流的DAG图信息点线式DAG的json
private String plDAG;
private String peDAG;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.core.repository;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
/**
* DAG 工作流 数据操作层
*
@ -10,4 +12,8 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/5/26
*/
public interface WorkflowInfoRepository extends JpaRepository<WorkflowInfoDO, Long> {
List<WorkflowInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
}

View File

@ -1,20 +1,22 @@
package com.github.kfcfans.oms.server.service.timing.schedule;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -43,7 +45,7 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
public class JobScheduleService {
public class OmsScheduleService {
private static final int MAX_BATCH_NUM = 10;
@ -56,6 +58,8 @@ public class JobScheduleService {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
@ -188,6 +192,19 @@ public class JobScheduleService {
});
}
private void scheduleWorkflow(List<Long> appIds) {
long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> {
List<WorkflowInfoDO> wfInfos = workflowInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
if (CollectionUtils.isEmpty(wfInfos)) {
return;
}
});
}
private void scheduleFrequentJob(List<Long> appIds) {
Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> {
@ -216,4 +233,5 @@ public class JobScheduleService {
}
});
}
}

View File

@ -0,0 +1,36 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.google.common.collect.Maps;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
/**
* 管理运行中的工作流实例
*
* @author tjq
* @since 2020/5/26
*/
@Service
public class WorkflowInstanceManager {
@Resource
private DispatchService dispatchService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
private final Map<Long, WorkflowInfoDO> wfId2Info = Maps.newConcurrentMap();
public void submit(WorkflowInfoDO wfInfo) {
wfId2Info.put(wfInfo.getId(), wfInfo);
}
}

View File

@ -1,8 +1,10 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
@ -32,6 +34,10 @@ public class WorkflowService {
*/
public Long saveWorkflow(SaveWorkflowRequest req) throws Exception {
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
throw new OmsException("illegal DAG");
}
Long wfId = req.getId();
WorkflowInfoDO wf;
if (wfId == null) {
@ -43,7 +49,7 @@ public class WorkflowService {
BeanUtils.copyProperties(req, wf);
wf.setGmtModified(new Date());
wf.setPlDAG(JsonUtils.toJSONString(req.getPlWorkflowDAG()));
wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG()));
// 计算 NextTriggerTime
TimeExpressionType timeExpressionType = TimeExpressionType.of(req.getTimeExpressionType());

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.web;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.response.ResultDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ControllerAdvice;
@ -21,7 +22,7 @@ public class ControllerExceptionHandler {
public ResultDTO<Void> exceptionHandler(Exception e) {
// 不是所有异常都需要打印完整堆栈后续可以定义内部的Exception便于判断
if (e instanceof IllegalArgumentException) {
if (e instanceof IllegalArgumentException || e instanceof OmsException) {
log.warn("[ControllerException] http request failed, message is {}.", e.getMessage());
}else {
log.error("[ControllerException] http request failed.", e);

View File

@ -24,7 +24,7 @@ public class WorkflowController {
private WorkflowService workflowService;
@PostMapping("/save")
public ResultDTO<Long> save(@RequestBody SaveWorkflowRequest req) {
public ResultDTO<Long> save(@RequestBody SaveWorkflowRequest req) throws Exception {
return ResultDTO.success(workflowService.saveWorkflow(req));
}