[dev] almost finished the dag feature

This commit is contained in:
tjq 2020-06-02 10:08:04 +08:00
parent 94c88899fc
commit d1f92d5ebf
10 changed files with 69 additions and 35 deletions

View File

@ -1,8 +1,9 @@
package com.github.kfcfans.oms.common.utils;
package com.github.kfcfans.oms.server.common.utils;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.model.WorkflowDAG;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
@ -59,12 +60,14 @@ public class WorkflowDAGUtils {
rootIds.remove(to.getJobId());
});
// 合法性校验
if (rootIds.size() != 1) {
// 合法性校验至少存在一个顶点
if (rootIds.size() < 1) {
throw new OmsException("Illegal DAG Graph: " + JsonUtils.toJSONString(PEWorkflowDAG));
}
return new WorkflowDAG(id2Node.get(rootIds.iterator().next()));
List<WorkflowDAG.Node> roots = Lists.newLinkedList();
rootIds.forEach(id -> roots.add(id2Node.get(id)));
return new WorkflowDAG(roots);
}
/**
@ -78,7 +81,7 @@ public class WorkflowDAGUtils {
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.add(dag.getRoot());
queue.addAll(dag.getRoots());
while (!queue.isEmpty()) {
WorkflowDAG.Node node = queue.poll();
@ -105,7 +108,14 @@ public class WorkflowDAGUtils {
public static boolean valid(PEWorkflowDAG peWorkflowDAG) {
try {
WorkflowDAG workflowDAG = convert(peWorkflowDAG);
return check(workflowDAG.getRoot(), Sets.newHashSet());
// 检查所有顶点的路径
for (WorkflowDAG.Node root : workflowDAG.getRoots()) {
if (!check(root, Sets.newHashSet())) {
return false;
}
}
return true;
}catch (Exception ignore) {
}
return false;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.common.model;
package com.github.kfcfans.oms.server.model;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -18,7 +18,8 @@ import java.util.List;
@AllArgsConstructor
public class WorkflowDAG {
private Node root;
// DAG允许存在多个顶点
private List<Node> roots;
@Data
@NoArgsConstructor

View File

@ -32,6 +32,8 @@ public class InstanceInfoDO {
// 任务实例ID
private Long instanceId;
// 任务实例参数
@Lob
@Column(columnDefinition="TEXT")
private String instanceParams;
// 该任务实例的类型普通/工作流InstanceType

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.service.ha;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.response.AskResponse;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.Ping;
@ -55,7 +56,7 @@ public class ServerSelectService {
// 无锁获取当前数据库中的Server
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
throw new RuntimeException(appId + " is not registered!");
throw new OmsException(appId + " is not registered!");
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();

View File

@ -6,8 +6,8 @@ import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.model.WorkflowDAG;
import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
@ -121,11 +121,12 @@ public class WorkflowInstanceManager {
WorkflowDAG workflowDAG = WorkflowDAGUtils.convert(JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class));
// 运行根任务无法找到根任务则直接失败
WorkflowDAG.Node root = workflowDAG.getRoot();
// 创建根任务实例
List<WorkflowDAG.Node> roots = workflowDAG.getRoots();
// 创建所有的根任务
roots.forEach(root -> {
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId);
});
// 持久化
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
@ -134,7 +135,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务
runInstance(root.getJobId(), instanceId, wfInstanceId, null);
roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null));
}catch (Exception e) {
wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV());
@ -164,8 +165,6 @@ public class WorkflowInstanceManager {
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get();
Long wfId = wfInstance.getWorkflowId();
log.debug("[Workflow-{}] one task in dag finished, wfInstanceId={},instanceId={},success={},result={}", wfId, wfInstanceId, instanceId, success, result);
try {
WorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
@ -175,7 +174,7 @@ public class WorkflowInstanceManager {
// 层序遍历 DAG更新完成节点的状态
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.add(dag.getRoot());
queue.addAll(dag.getRoots());
while (!queue.isEmpty()) {
WorkflowDAG.Node head = queue.poll();
if (instanceId.equals(head.getInstanceId())) {
@ -209,12 +208,16 @@ public class WorkflowInstanceManager {
AtomicBoolean allFinished = new AtomicBoolean(true);
relyMap.keySet().forEach(jobId -> {
// 如果该任务本身已经完成不需要再计算直接跳过
// 无需计算已完成节点
if (jobId2Node.get(jobId).isFinished()) {
return;
}
allFinished.set(false);
// 存在 instanceId代表任务已派发过无需再次计算
if (jobId2Node.get(jobId).getInstanceId() != null) {
return;
}
// 判断某个任务所有依赖的完成情况只要有一个未完成即无法执行
for (Long reliedJobId : relyMap.get(jobId)) {
if (!jobId2Node.get(reliedJobId).isFinished()) {

View File

@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.server.model.WorkflowDAG;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
@ -44,7 +44,7 @@ public class WorkflowInstanceService {
throw new OmsException("Permission Denied!");
}
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
throw new OmsException("already stopped");
throw new OmsException("workflow instance already stopped");
}
// 修改数据库状态
@ -56,7 +56,7 @@ public class WorkflowInstanceService {
// 停止所有已启动且未完成的服务
WorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.add(workflowDAG.getRoot());
queue.addAll(workflowDAG.getRoots());
while (!queue.isEmpty()) {
WorkflowDAG.Node node = queue.poll();

View File

@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.SJ;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression;

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.server.web.response;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.common.SJ;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
@ -54,7 +54,7 @@ public class WorkflowInfoVO {
vo.enable = SwitchableStatus.of(wfDO.getStatus()) == SwitchableStatus.ENABLE;
vo.setTimeExpressionType(TimeExpressionType.of(wfDO.getTimeExpressionType()).name());
vo.setPEWorkflowDAG(JsonUtils.parseObjectUnsafe(wfDO.getPeDAG(), PEWorkflowDAG.class));
vo.setPEWorkflowDAG(JSONObject.parseObject(wfDO.getPeDAG(), PEWorkflowDAG.class));
if (!StringUtils.isEmpty(wfDO.getNotifyUserIds())) {
vo.setNotifyUserIds(SJ.commaSplitter.splitToList(wfDO.getNotifyUserIds()).stream().map(Long::valueOf).collect(Collectors.toList()));
}

View File

@ -1,11 +1,11 @@
package com.github.kfcfans.oms.server.web.response;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.OmsConstant;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.model.WorkflowDAG;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import lombok.Data;
import org.apache.commons.lang3.time.DateFormatUtils;
@ -45,7 +45,7 @@ public class WorkflowInstanceInfoVO {
vo.setWorkflowName(workflowName);
vo.setStatusStr(WorkflowInstanceStatus.of(wfInstanceDO.getStatus()).getDes());
vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JsonUtils.parseObjectUnsafe(wfInstanceDO.getDag(), WorkflowDAG.class)));
vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JSONObject.parseObject(wfInstanceDO.getDag(), WorkflowDAG.class)));
// JS精度丢失问题
vo.setWfInstanceId(String.valueOf(wfInstanceDO.getWfInstanceId()));

View File

@ -2,9 +2,9 @@ package com.github.kfcfans.oms.server.test;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.server.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils;
import com.google.common.collect.Lists;
import org.junit.Test;
@ -61,6 +61,23 @@ public class DAGTest {
// 打断点看 reference 关系
System.out.println(wfDAGByJackSon);
System.out.println(wfDAGByFastJSON);
// 测试图三双顶点 1 -> 3, 2 -> 4
List<PEWorkflowDAG.Node> nodes3 = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges3 = Lists.newLinkedList();
nodes3.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(3L, "3", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(4L, "4", null, false, null));
edges3.add(new PEWorkflowDAG.Edge(1L, 3L));
edges3.add(new PEWorkflowDAG.Edge(2L, 4L));
PEWorkflowDAG multiRootPEDAG = new PEWorkflowDAG(nodes3, edges3);
System.out.println(WorkflowDAGUtils.valid(multiRootPEDAG));
WorkflowDAG multiRootDAG = WorkflowDAGUtils.convert(multiRootPEDAG);
System.out.println(multiRootDAG);
}