diff --git a/README.md b/README.md
index 07a7d582..7728c576 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,9 @@

- [](https://github.com/KFCFans/OhMyScheduler/blob/master/LICENSE)
+
+
+
+
+
OhMyScheduler is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java
index 4d9cb4cc..4db453e8 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java
@@ -34,8 +34,13 @@ public class PEWorkflowDAG {
// 仅向前端输出时需要
private Long instanceId;
- private boolean finished;
+ private int status;
private String result;
+
+ public Node(Long jobId, String jobName) {
+ this.jobId = jobId;
+ this.jobName = jobName;
+ }
}
// 边 jobId -> jobId
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java
index b7503bde..16e507d1 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/WorkflowDAGUtils.java
@@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.common.utils;
+import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
@@ -38,7 +39,7 @@ public class WorkflowDAGUtils {
// 创建节点
PEWorkflowDAG.getNodes().forEach(node -> {
Long jobId = node.getJobId();
- WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, false, null);
+ WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, InstanceStatus.WAITING_DISPATCH.getV(), null);
id2Node.put(jobId, n);
// 初始阶段,每一个点都设为顶点
@@ -88,7 +89,7 @@ public class WorkflowDAGUtils {
queue.addAll(node.getSuccessors());
// 添加点
- PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.isFinished(), node.getResult());
+ PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.getStatus(), node.getResult());
nodes.add(peNode);
// 添加线
@@ -111,7 +112,7 @@ public class WorkflowDAGUtils {
// 检查所有顶点的路径
for (WorkflowDAG.Node root : workflowDAG.getRoots()) {
- if (!check(root, Sets.newHashSet())) {
+ if (invalidPath(root, Sets.newHashSet())) {
return false;
}
}
@@ -121,21 +122,21 @@ public class WorkflowDAGUtils {
return false;
}
- private static boolean check(WorkflowDAG.Node root, Set ids) {
+ private static boolean invalidPath(WorkflowDAG.Node root, Set ids) {
// 递归出口(出现之前的节点则代表有环,失败;出现无后继者节点,则说明该路径成功)
if (ids.contains(root.getJobId())) {
- return false;
+ return true;
}
if (root.getSuccessors().isEmpty()) {
- return true;
+ return false;
}
ids.add(root.getJobId());
for (WorkflowDAG.Node node : root.getSuccessors()) {
- if (!check(node, Sets.newHashSet(ids))) {
- return false;
+ if (invalidPath(node, Sets.newHashSet(ids))) {
+ return true;
}
}
- return true;
+ return false;
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java
index e9a8d94f..15419809 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/model/WorkflowDAG.java
@@ -31,8 +31,8 @@ public class WorkflowDAG {
private String jobName;
// 运行时参数
- private Long instanceId;
- private boolean finished;
+ private Long instanceId; // 任务实例ID
+ private int status; // 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED
private String result;
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java
index 8aa2084d..db9d5331 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java
@@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
@@ -126,6 +127,7 @@ public class WorkflowInstanceManager {
roots.forEach(root -> {
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId);
+ root.setStatus(InstanceStatus.RUNNING.getV());
});
// 持久化
@@ -178,7 +180,7 @@ public class WorkflowInstanceManager {
while (!queue.isEmpty()) {
WorkflowDAG.Node head = queue.poll();
if (instanceId.equals(head.getInstanceId())) {
- head.setFinished(true);
+ head.setStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
head.setResult(result);
log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result);
@@ -208,8 +210,8 @@ public class WorkflowInstanceManager {
AtomicBoolean allFinished = new AtomicBoolean(true);
relyMap.keySet().forEach(jobId -> {
- // 无需计算已完成节点
- if (jobId2Node.get(jobId).isFinished()) {
+ // 无需计算已完成节点(理论上此处不可能出现 FAILED 的情况)
+ if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV()) {
return;
}
allFinished.set(false);
@@ -220,7 +222,7 @@ public class WorkflowInstanceManager {
}
// 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行
for (Long reliedJobId : relyMap.get(jobId)) {
- if (!jobId2Node.get(reliedJobId).isFinished()) {
+ if (jobId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) {
return;
}
}
@@ -232,6 +234,7 @@ public class WorkflowInstanceManager {
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JSONObject.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis());
jobId2Node.get(jobId).setInstanceId(newInstanceId);
+ jobId2Node.get(jobId).setStatus(InstanceStatus.RUNNING.getV());
jobId2InstanceId.put(jobId, newInstanceId);
jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result));
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java
index 91690976..783240ce 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java
@@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
+import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
@@ -45,13 +46,6 @@ public class WorkflowInstanceService {
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
throw new OmsException("workflow instance already stopped");
}
-
- // 修改数据库状态
- wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
- wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER);
- wfInstance.setGmtModified(new Date());
- wfInstanceInfoRepository.saveAndFlush(wfInstance);
-
// 停止所有已启动且未完成的服务
WorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
Queue queue = Queues.newLinkedBlockingQueue();
@@ -59,13 +53,22 @@ public class WorkflowInstanceService {
while (!queue.isEmpty()) {
WorkflowDAG.Node node = queue.poll();
- if (node.getInstanceId() != null && !node.isFinished()) {
+ if (node.getInstanceId() != null && node.getStatus() == InstanceStatus.RUNNING.getV()) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
+ node.setStatus(InstanceStatus.STOPPED.getV());
+ node.setResult(SystemInstanceResult.STOPPED_BY_USER);
+
instanceService.stopInstance(node.getInstanceId());
}
queue.addAll(node.getSuccessors());
}
+ // 修改数据库状态
+ wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
+ wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER);
+ wfInstance.setGmtModified(new Date());
+ wfInstanceInfoRepository.saveAndFlush(wfInstance);
+
log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId);
}
diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java
index a1d45db7..db780af9 100644
--- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java
+++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/DAGTest.java
@@ -25,8 +25,8 @@ public class DAGTest {
List edges = Lists.newLinkedList();
// 图1: 1 -> 2 -> 1,理论上报错
- nodes.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
- nodes.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
+ nodes.add(new PEWorkflowDAG.Node(1L, "1"));
+ nodes.add(new PEWorkflowDAG.Node(2L, "2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
edges.add(new PEWorkflowDAG.Edge(2L, 1L));
System.out.println(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges)));
@@ -35,10 +35,10 @@ public class DAGTest {
List nodes2 = Lists.newLinkedList();
List edges2 = Lists.newLinkedList();
- nodes2.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
- nodes2.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
- nodes2.add(new PEWorkflowDAG.Node(3L, "3", null, false, null));
- nodes2.add(new PEWorkflowDAG.Node(4L, "4", null, false, null));
+ nodes2.add(new PEWorkflowDAG.Node(1L, "1"));
+ nodes2.add(new PEWorkflowDAG.Node(2L, "2"));
+ nodes2.add(new PEWorkflowDAG.Node(3L, "3"));
+ nodes2.add(new PEWorkflowDAG.Node(4L, "4"));
edges2.add(new PEWorkflowDAG.Edge(1L, 2L));
edges2.add(new PEWorkflowDAG.Edge(1L, 3L));
edges2.add(new PEWorkflowDAG.Edge(2L, 4L));
@@ -66,10 +66,10 @@ public class DAGTest {
List nodes3 = Lists.newLinkedList();
List edges3 = Lists.newLinkedList();
- nodes3.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
- nodes3.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
- nodes3.add(new PEWorkflowDAG.Node(3L, "3", null, false, null));
- nodes3.add(new PEWorkflowDAG.Node(4L, "4", null, false, null));
+ nodes3.add(new PEWorkflowDAG.Node(1L, "1"));
+ nodes3.add(new PEWorkflowDAG.Node(2L, "2"));
+ nodes3.add(new PEWorkflowDAG.Node(3L, "3"));
+ nodes3.add(new PEWorkflowDAG.Node(4L, "4"));
edges3.add(new PEWorkflowDAG.Edge(1L, 3L));
edges3.add(new PEWorkflowDAG.Edge(2L, 4L));
diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java
new file mode 100644
index 00000000..cafd35d1
--- /dev/null
+++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/workflow/WorkflowStandaloneProcessor.java
@@ -0,0 +1,35 @@
+package com.github.kfcfans.oms.samples.workflow;
+
+import com.github.kfcfans.oms.common.utils.JsonUtils;
+import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
+import com.github.kfcfans.oms.worker.core.processor.TaskContext;
+import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
+import com.github.kfcfans.oms.worker.log.OmsLogger;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * 工作流测试
+ *
+ * @author tjq
+ * @since 2020/6/2
+ */
+@Component
+public class WorkflowStandaloneProcessor implements BasicProcessor {
+
+ @Override
+ public ProcessResult process(TaskContext context) throws Exception {
+ OmsLogger logger = context.getOmsLogger();
+ logger.info("current:" + JsonUtils.toJSONString(context));
+ System.out.println("currentContext:");
+ System.out.println(JsonUtils.toJSONString(context));
+
+ // 尝试获取上游任务
+ Map upstreamTaskResult = context.fetchUpstreamTaskResult();
+ System.out.println("工作流上游任务数据:");
+ System.out.println(upstreamTaskResult);
+
+ return new ProcessResult(true, context.getJobId() + " process successfully.");
+ }
+}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
index ada8d32d..4ee62557 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
@@ -155,7 +155,7 @@ public class ProcessorRunnable implements Runnable {
try {
processResult = processor.process(taskContext);
}catch (Exception e) {
- log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e);
+ log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()));
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java
index 68239fe6..81b3bd88 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java
@@ -1,21 +1,27 @@
package com.github.kfcfans.oms.worker.core.processor;
+import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.log.OmsLogger;
+import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
+
+import java.util.Map;
/**
* 任务上下文
* 概念统一,所有的worker只处理Task,Job和JobInstance的概念只存在于Server和TaskTracker
- * 单机任务 -> 整个Job变成一个Task
- * 广播任务 -> 整个jOb变成一堆一样的Task
- * MR 任务 -> 被map出来的任务都视为根Task的子Task
+ * 单机任务:整个Job变成一个Task
+ * 广播任务:整个job变成一堆一样的Task
+ * MR 任务:被map出来的任务都视为根Task的子Task
*
* @author tjq
* @since 2020/3/18
*/
@Getter
@Setter
+@ToString
public class TaskContext {
private Long jobId;
@@ -29,7 +35,9 @@ public class TaskContext {
*/
private String jobParams;
/**
- * 通过 OpenAPI 传递的参数
+ * 任务实例运行中参数
+ * 若该任务实例通过 OpenAPI 触发,则该值为 OpenAPI 传递的参数
+ * 若该任务为工作流的某个节点,则该值为上游任务传递下来的数据,推荐通过 {@link TaskContext#fetchUpstreamTaskResult()} 方法获取
*/
private String instanceParams;
/**
@@ -49,17 +57,17 @@ public class TaskContext {
*/
private OmsLogger omsLogger;
- public String getDescription() {
- return "subInstanceId='" + subInstanceId + '\'' +
- ", instanceId='" + instanceId + '\'' +
- ", taskId='" + taskId + '\'' +
- ", taskName='" + taskName + '\'' +
- ", jobParams='" + jobParams + '\'' +
- ", instanceParams='" + instanceParams;
- }
- @Override
- public String toString() {
- return getDescription();
+ /**
+ * 获取工作流上游任务传递的数据(仅该任务实例由工作流触发时存在)
+ * @return key: 上游任务的 jobId;value: 上游任务的 ProcessResult#result
+ */
+ @SuppressWarnings("unchecked")
+ public Map fetchUpstreamTaskResult() {
+ try {
+ return (Map)JsonUtils.parseObject(instanceParams, Map.class);
+ }catch (Exception ignore) {
+ }
+ return Maps.newHashMap();
}
}