[dev] change DAG description method

This commit is contained in:
tjq 2020-06-02 20:14:49 +08:00
parent ae86b70686
commit 55dbc86d29
10 changed files with 110 additions and 51 deletions

View File

@ -1,5 +1,9 @@
![logo](./others/images/oms-logo.png)
![Java CI with Maven](https://github.com/KFCFans/OhMyScheduler/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master) [![GitHub license](https://img.shields.io/github/license/KFCFans/OhMyScheduler)](https://github.com/KFCFans/OhMyScheduler/blob/master/LICENSE)
<p align="center">
<a href="https://github.com/KFCFans/OhMyScheduler/actions"><img src="https://github.com/KFCFans/OhMyScheduler/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master"></a>
<a href="https://github.com/KFCFans/OhMyScheduler/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/OhMyScheduler"></a>
</p>
OhMyScheduler is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.

View File

@ -34,8 +34,13 @@ public class PEWorkflowDAG {
// 仅向前端输出时需要
private Long instanceId;
private boolean finished;
private int status;
private String result;
public Node(Long jobId, String jobName) {
this.jobId = jobId;
this.jobName = jobName;
}
}
// jobId -> jobId

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.common.utils;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
@ -38,7 +39,7 @@ public class WorkflowDAGUtils {
// 创建节点
PEWorkflowDAG.getNodes().forEach(node -> {
Long jobId = node.getJobId();
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, false, null);
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, InstanceStatus.WAITING_DISPATCH.getV(), null);
id2Node.put(jobId, n);
// 初始阶段每一个点都设为顶点
@ -88,7 +89,7 @@ public class WorkflowDAGUtils {
queue.addAll(node.getSuccessors());
// 添加点
PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.isFinished(), node.getResult());
PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.getStatus(), node.getResult());
nodes.add(peNode);
// 添加线
@ -111,7 +112,7 @@ public class WorkflowDAGUtils {
// 检查所有顶点的路径
for (WorkflowDAG.Node root : workflowDAG.getRoots()) {
if (!check(root, Sets.newHashSet())) {
if (invalidPath(root, Sets.newHashSet())) {
return false;
}
}
@ -121,21 +122,21 @@ public class WorkflowDAGUtils {
return false;
}
private static boolean check(WorkflowDAG.Node root, Set<Long> ids) {
private static boolean invalidPath(WorkflowDAG.Node root, Set<Long> ids) {
// 递归出口出现之前的节点则代表有环失败出现无后继者节点则说明该路径成功
if (ids.contains(root.getJobId())) {
return false;
return true;
}
if (root.getSuccessors().isEmpty()) {
return true;
return false;
}
ids.add(root.getJobId());
for (WorkflowDAG.Node node : root.getSuccessors()) {
if (!check(node, Sets.newHashSet(ids))) {
return false;
}
}
if (invalidPath(node, Sets.newHashSet(ids))) {
return true;
}
}
return false;
}
}

View File

@ -31,8 +31,8 @@ public class WorkflowDAG {
private String jobName;
// 运行时参数
private Long instanceId;
private boolean finished;
private Long instanceId; // 任务实例ID
private int status; // 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED
private String result;
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
@ -126,6 +127,7 @@ public class WorkflowInstanceManager {
roots.forEach(root -> {
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV());
});
// 持久化
@ -178,7 +180,7 @@ public class WorkflowInstanceManager {
while (!queue.isEmpty()) {
WorkflowDAG.Node head = queue.poll();
if (instanceId.equals(head.getInstanceId())) {
head.setFinished(true);
head.setStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
head.setResult(result);
log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result);
@ -208,8 +210,8 @@ public class WorkflowInstanceManager {
AtomicBoolean allFinished = new AtomicBoolean(true);
relyMap.keySet().forEach(jobId -> {
// 无需计算已完成节点
if (jobId2Node.get(jobId).isFinished()) {
// 无需计算已完成节点理论上此处不可能出现 FAILED 的情况
if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV()) {
return;
}
allFinished.set(false);
@ -220,7 +222,7 @@ public class WorkflowInstanceManager {
}
// 判断某个任务所有依赖的完成情况只要有一个未完成即无法执行
for (Long reliedJobId : relyMap.get(jobId)) {
if (!jobId2Node.get(reliedJobId).isFinished()) {
if (jobId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) {
return;
}
}
@ -232,6 +234,7 @@ public class WorkflowInstanceManager {
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JSONObject.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis());
jobId2Node.get(jobId).setInstanceId(newInstanceId);
jobId2Node.get(jobId).setStatus(InstanceStatus.RUNNING.getV());
jobId2InstanceId.put(jobId, newInstanceId);
jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result));

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
@ -45,13 +46,6 @@ public class WorkflowInstanceService {
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
throw new OmsException("workflow instance already stopped");
}
// 修改数据库状态
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER);
wfInstance.setGmtModified(new Date());
wfInstanceInfoRepository.saveAndFlush(wfInstance);
// 停止所有已启动且未完成的服务
WorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
@ -59,13 +53,22 @@ public class WorkflowInstanceService {
while (!queue.isEmpty()) {
WorkflowDAG.Node node = queue.poll();
if (node.getInstanceId() != null && !node.isFinished()) {
if (node.getInstanceId() != null && node.getStatus() == InstanceStatus.RUNNING.getV()) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceService.stopInstance(node.getInstanceId());
}
queue.addAll(node.getSuccessors());
}
// 修改数据库状态
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER);
wfInstance.setGmtModified(new Date());
wfInstanceInfoRepository.saveAndFlush(wfInstance);
log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId);
}

View File

@ -25,8 +25,8 @@ public class DAGTest {
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
// 图1 1 -> 2 -> 1理论上报错
nodes.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
nodes.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
nodes.add(new PEWorkflowDAG.Node(1L, "1"));
nodes.add(new PEWorkflowDAG.Node(2L, "2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
edges.add(new PEWorkflowDAG.Edge(2L, 1L));
System.out.println(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges)));
@ -35,10 +35,10 @@ public class DAGTest {
List<PEWorkflowDAG.Node> nodes2 = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges2 = Lists.newLinkedList();
nodes2.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(3L, "3", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(4L, "4", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(1L, "1"));
nodes2.add(new PEWorkflowDAG.Node(2L, "2"));
nodes2.add(new PEWorkflowDAG.Node(3L, "3"));
nodes2.add(new PEWorkflowDAG.Node(4L, "4"));
edges2.add(new PEWorkflowDAG.Edge(1L, 2L));
edges2.add(new PEWorkflowDAG.Edge(1L, 3L));
edges2.add(new PEWorkflowDAG.Edge(2L, 4L));
@ -66,10 +66,10 @@ public class DAGTest {
List<PEWorkflowDAG.Node> nodes3 = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges3 = Lists.newLinkedList();
nodes3.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(3L, "3", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(4L, "4", null, false, null));
nodes3.add(new PEWorkflowDAG.Node(1L, "1"));
nodes3.add(new PEWorkflowDAG.Node(2L, "2"));
nodes3.add(new PEWorkflowDAG.Node(3L, "3"));
nodes3.add(new PEWorkflowDAG.Node(4L, "4"));
edges3.add(new PEWorkflowDAG.Edge(1L, 3L));
edges3.add(new PEWorkflowDAG.Edge(2L, 4L));

View File

@ -0,0 +1,35 @@
package com.github.kfcfans.oms.samples.workflow;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.oms.worker.log.OmsLogger;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 工作流测试
*
* @author tjq
* @since 2020/6/2
*/
@Component
public class WorkflowStandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger();
logger.info("current:" + JsonUtils.toJSONString(context));
System.out.println("currentContext:");
System.out.println(JsonUtils.toJSONString(context));
// 尝试获取上游任务
Map<Long, String> upstreamTaskResult = context.fetchUpstreamTaskResult();
System.out.println("工作流上游任务数据:");
System.out.println(upstreamTaskResult);
return new ProcessResult(true, context.getJobId() + " process successfully.");
}
}

View File

@ -155,7 +155,7 @@ public class ProcessorRunnable implements Runnable {
try {
processResult = processor.process(taskContext);
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e);
log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()));

View File

@ -1,21 +1,27 @@
package com.github.kfcfans.oms.worker.core.processor;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.log.OmsLogger;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.util.Map;
/**
* 任务上下文
* 概念统一所有的worker只处理TaskJob和JobInstance的概念只存在于Server和TaskTracker
* 单机任务 -> 整个Job变成一个Task
* 广播任务 -> 整个jOb变成一堆一样的Task
* MR 任务 -> 被map出来的任务都视为根Task的子Task
* 单机任务整个Job变成一个Task
* 广播任务整个job变成一堆一样的Task
* MR 任务被map出来的任务都视为根Task的子Task
*
* @author tjq
* @since 2020/3/18
*/
@Getter
@Setter
@ToString
public class TaskContext {
private Long jobId;
@ -29,7 +35,9 @@ public class TaskContext {
*/
private String jobParams;
/**
* 通过 OpenAPI 传递的参数
* 任务实例运行中参数
* 若该任务实例通过 OpenAPI 触发则该值为 OpenAPI 传递的参数
* 若该任务为工作流的某个节点则该值为上游任务传递下来的数据推荐通过 {@link TaskContext#fetchUpstreamTaskResult()} 方法获取
*/
private String instanceParams;
/**
@ -49,17 +57,17 @@ public class TaskContext {
*/
private OmsLogger omsLogger;
public String getDescription() {
return "subInstanceId='" + subInstanceId + '\'' +
", instanceId='" + instanceId + '\'' +
", taskId='" + taskId + '\'' +
", taskName='" + taskName + '\'' +
", jobParams='" + jobParams + '\'' +
", instanceParams='" + instanceParams;
}
@Override
public String toString() {
return getDescription();
/**
* 获取工作流上游任务传递的数据仅该任务实例由工作流触发时存在
* @return key: 上游任务的 jobIdvalue: 上游任务的 ProcessResult#result
*/
@SuppressWarnings("unchecked")
public Map<Long, String> fetchUpstreamTaskResult() {
try {
return (Map<Long, String>)JsonUtils.parseObject(instanceParams, Map.class);
}catch (Exception ignore) {
}
return Maps.newHashMap();
}
}