[opt] optimize the workflowInstance, try to remove ReferenceDAG

This commit is contained in:
tjq 2020-06-03 17:02:05 +08:00
parent a65787f372
commit d1b3aebb41
3 changed files with 54 additions and 25 deletions

View File

@ -107,6 +107,16 @@ public class WorkflowDAGUtils {
* @return true/false
*/
public static boolean valid(PEWorkflowDAG peWorkflowDAG) {
// 点不允许重复一个工作流中某个任务只允许出现一次
Set<Long> jobIds = Sets.newHashSet();
for (PEWorkflowDAG.Node n : peWorkflowDAG.getNodes()) {
if (jobIds.contains(n.getJobId())) {
return false;
}
jobIds.add(n.getJobId());
}
try {
WorkflowDAG workflowDAG = convert(peWorkflowDAG);

View File

@ -170,7 +170,7 @@ public class InstanceManager {
// workflow 特殊处理
if (wfInstanceId != null) {
// 手动停止在工作流中也认为是失败理论上不应该发生
getWorkflowInstanceManager().move(wfInstanceId, instanceId, status == InstanceStatus.SUCCEED, result);
getWorkflowInstanceManager().move(wfInstanceId, instanceId, status, result);
}
// 告警

View File

@ -26,7 +26,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 管理运行中的工作流实例
@ -129,6 +128,8 @@ public class WorkflowInstanceManager {
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create root instance(jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getJobId(), instanceId);
});
// 持久化
@ -155,10 +156,15 @@ public class WorkflowInstanceManager {
* 下一步当工作流的某个任务完成时调用该方法
* @param wfInstanceId 工作流任务实例ID
* @param instanceId 具体完成任务的某个任务实例ID
* @param success 完成任务的任务实例是否成功
* @param status 完成任务的任务实例状态SUCCEED/FAILED/STOPPED
* @param result 完成任务的任务实例结果
*/
public void move(Long wfInstanceId, Long instanceId, boolean success, String result) {
public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
// 手动停止的DAG数据已被更新无需再次处理
if (status == InstanceStatus.STOPPED) {
return;
}
int lockId = wfInstanceId.hashCode();
try {
@ -172,6 +178,8 @@ public class WorkflowInstanceManager {
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get();
Long wfId = wfInstance.getWorkflowId();
log.debug("[Workflow-{}|{}] instanceId={},status={},result={}.", wfId, wfInstanceId, instanceId, status, result);
try {
WorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
@ -182,23 +190,37 @@ public class WorkflowInstanceManager {
// 层序遍历 DAG更新完成节点的状态
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.addAll(dag.getRoots());
boolean allFinished = true;
while (!queue.isEmpty()) {
WorkflowDAG.Node head = queue.poll();
if (instanceId.equals(head.getInstanceId())) {
head.setStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
head.setStatus(status.getV());
head.setResult(result);
log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result);
log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, head.getJobId(), status.name(), result);
}
queue.addAll(head.getSuccessors());
if (head.getStatus() == InstanceStatus.WAITING_DISPATCH.getV() || head.getStatus() == InstanceStatus.RUNNING.getV()) {
allFinished = false;
}
jobId2Node.put(head.getJobId(), head);
head.getSuccessors().forEach(n -> relyMap.put(n.getJobId(), head.getJobId()));
}
wfInstance.setGmtModified(new Date());
wfInstance.setDag(JSONObject.toJSONString(dag));
// 工作流已经结束某个节点失败导致工作流整体已经失败仅更新最新的DAG图
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus());
return;
}
// 任务失败DAG流程被打断整体失败
if (!success) {
wfInstance.setDag(JSONObject.toJSONString(dag));
if (status == InstanceStatus.FAILED) {
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_FAILED);
wfInstance.setFinishedTime(System.currentTimeMillis());
@ -208,21 +230,26 @@ public class WorkflowInstanceManager {
return;
}
// 工作流执行完毕能执行到这里代表该工作流内所有子任务都执行成功了
if (allFinished) {
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
// 最终任务的结果作为整个 workflow 的结果
wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
return;
}
// 重新计算需要派发的任务
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
Map<Long, String> jobId2InstanceParams = Maps.newHashMap();
AtomicBoolean allFinished = new AtomicBoolean(true);
relyMap.keySet().forEach(jobId -> {
// 无需计算已完成节点理论上此处不可能出现 FAILED 的情况
if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV()) {
return;
}
allFinished.set(false);
// 存在 instanceId代表任务已派发过无需再次计算
if (jobId2Node.get(jobId).getInstanceId() != null) {
// 跳过已完成节点理论上此处不可能出现 FAILED 的情况和已派发节点存在 InstanceId
if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV() || jobId2Node.get(jobId).getInstanceId() != null) {
return;
}
// 判断某个任务所有依赖的完成情况只要有一个未完成即无法执行
@ -247,14 +274,6 @@ public class WorkflowInstanceManager {
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId);
});
if (allFinished.get()) {
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
// 最终任务的结果作为整个 workflow 的结果
wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
}
wfInstance.setDag(JSONObject.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstance);