mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: 工作流支持节点禁用,原地重试
This commit is contained in:
parent
4ee6300c6a
commit
c3cc8aef4c
@ -57,5 +57,8 @@ public class SystemInstanceResult {
|
||||
*/
|
||||
public static final String INVALID_DAG = "invalid dag";
|
||||
|
||||
|
||||
/**
|
||||
* 被禁用的节点
|
||||
*/
|
||||
public static final String DISABLE_NODE = "disable node";
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import com.google.common.collect.Lists;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
@ -35,6 +36,7 @@ public class PEWorkflowDAG implements Serializable {
|
||||
* Point.
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class Node implements Serializable {
|
||||
|
@ -2,13 +2,13 @@ package com.github.kfcfans.powerjob.server.common.utils;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||
import com.github.kfcfans.powerjob.server.model.WorkflowDAG;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.*;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -24,6 +24,7 @@ public class WorkflowDAGUtils {
|
||||
private WorkflowDAGUtils() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有根节点
|
||||
*
|
||||
@ -72,6 +73,128 @@ public class WorkflowDAGUtils {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add by Echo009 on 2021/02/08
|
||||
* 获取准备好的节点(非完成状态的节点且,前置依赖节点为空或者均处于完成状态)
|
||||
* 注意,这里会直接将当前 disable(enable = false)的节点的状态置为完成
|
||||
*
|
||||
* @param dag 点线表示法的DAG图
|
||||
* @return 当前可执行的节点
|
||||
*/
|
||||
public static List<PEWorkflowDAG.Node> listReadyNodes(PEWorkflowDAG dag) {
|
||||
// 保存 nodeId -> Node 的映射关系
|
||||
Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
|
||||
|
||||
List<PEWorkflowDAG.Node> dagNodes = dag.getNodes();
|
||||
for (PEWorkflowDAG.Node node : dagNodes) {
|
||||
nodeId2Node.put(node.getNodeId(), node);
|
||||
}
|
||||
// 构建依赖树(下游任务需要哪些上游任务完成才能执行)
|
||||
Multimap<Long, Long> relyMap = LinkedListMultimap.create();
|
||||
// 后继节点 Map
|
||||
Multimap<Long, Long> successorMap = LinkedListMultimap.create();
|
||||
dag.getEdges().forEach(edge -> {
|
||||
relyMap.put(edge.getTo(), edge.getFrom());
|
||||
successorMap.put(edge.getFrom(), edge.getTo());
|
||||
});
|
||||
List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
|
||||
List<PEWorkflowDAG.Node> skipNodes = Lists.newArrayList();
|
||||
|
||||
for (PEWorkflowDAG.Node currentNode : dagNodes) {
|
||||
if (!isReadyNode(currentNode.getNodeId(), nodeId2Node, relyMap)) {
|
||||
continue;
|
||||
}
|
||||
// 需要直接跳过的节点
|
||||
if (currentNode.getEnable() != null && !currentNode.getEnable()) {
|
||||
skipNodes.add(currentNode);
|
||||
} else {
|
||||
readyNodes.add(currentNode);
|
||||
}
|
||||
}
|
||||
// 当前直接跳过的节点不为空
|
||||
if (!skipNodes.isEmpty()) {
|
||||
for (PEWorkflowDAG.Node skipNode : skipNodes) {
|
||||
// move
|
||||
readyNodes.addAll(moveAndObtainReadySuccessor(skipNode, nodeId2Node, relyMap, successorMap));
|
||||
}
|
||||
}
|
||||
return readyNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* 移动并获取就绪的后继节点
|
||||
*
|
||||
* @param skippedNode 当前需要跳过的节点
|
||||
* @param nodeId2Node nodeId -> Node
|
||||
* @param relyMap to-node id -> list of from-node id
|
||||
* @param successorMap from-node id -> list of to-node id
|
||||
* @return 就绪的后继节点
|
||||
*/
|
||||
private static List<PEWorkflowDAG.Node> moveAndObtainReadySuccessor(PEWorkflowDAG.Node skippedNode, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, Long> relyMap, Multimap<Long, Long> successorMap) {
|
||||
|
||||
// 更新当前跳过节点的状态
|
||||
skippedNode.setStatus(InstanceStatus.SUCCEED.getV());
|
||||
skippedNode.setResult(SystemInstanceResult.DISABLE_NODE);
|
||||
// 有可能出现需要连续移动的情况
|
||||
List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
|
||||
List<PEWorkflowDAG.Node> skipNodes = Lists.newArrayList();
|
||||
// 获取当前跳过节点的后继节点
|
||||
Collection<Long> successors = successorMap.get(skippedNode.getNodeId());
|
||||
for (Long successor : successors) {
|
||||
// 判断后继节点是否处于 Ready 状态(前驱节点均处于完成状态)
|
||||
if (isReadyNode(successor, nodeId2Node, relyMap)) {
|
||||
PEWorkflowDAG.Node node = nodeId2Node.get(successor);
|
||||
if (node.getEnable() != null && !node.getEnable()) {
|
||||
// 需要跳过
|
||||
skipNodes.add(node);
|
||||
continue;
|
||||
}
|
||||
readyNodes.add(node);
|
||||
}
|
||||
}
|
||||
// 深度优先,继续移动
|
||||
if (!skipNodes.isEmpty()) {
|
||||
for (PEWorkflowDAG.Node node : skipNodes) {
|
||||
readyNodes.addAll(moveAndObtainReadySuccessor(node, nodeId2Node, relyMap, successorMap));
|
||||
}
|
||||
}
|
||||
return readyNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断当前节点是否准备就绪
|
||||
*
|
||||
* @param nodeId Node id
|
||||
* @param nodeId2Node Node id -> Node
|
||||
* @param relyMap to-node id -> list of from-node id
|
||||
* @return true if current node is ready
|
||||
*/
|
||||
private static boolean isReadyNode(long nodeId, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, Long> relyMap) {
|
||||
PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId);
|
||||
int currentNodeStatus = currentNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : currentNode.getStatus();
|
||||
// 跳过已完成节点(处理成功 或者 处理失败)和已派发节点(存在 InstanceId)
|
||||
if (InstanceStatus.FINISHED_STATUS.contains(currentNodeStatus)
|
||||
|| currentNode.getInstanceId() != null) {
|
||||
return false;
|
||||
}
|
||||
Collection<Long> relyNodeIds = relyMap.get(nodeId);
|
||||
for (Long relyNodeId : relyNodeIds) {
|
||||
PEWorkflowDAG.Node relyNode = nodeId2Node.get(relyNodeId);
|
||||
int relyNodeStatus = relyNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : relyNode.getStatus();
|
||||
// 只要依赖的节点有一个未完成,那么就不是就绪状态
|
||||
// 注意,这里允许失败的原因是有允许失败跳过节点的存在,对于不允许跳过的失败节点,一定走不到这里(工作流会被打断)
|
||||
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(relyNodeStatus)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) {
|
||||
// 默认不允许跳过
|
||||
return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed();
|
||||
}
|
||||
/**
|
||||
* 将点线表示法的DAG图转化为引用表达法的DAG图
|
||||
*
|
||||
|
@ -29,6 +29,8 @@ import org.springframework.stereotype.Service;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
|
||||
import static com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils.isNotAllowSkipWhenFailed;
|
||||
|
||||
/**
|
||||
* 管理运行中的工作流实例
|
||||
*
|
||||
@ -94,10 +96,26 @@ public class WorkflowInstanceManager {
|
||||
newWfInstance.setGmtCreate(now);
|
||||
newWfInstance.setGmtModified(now);
|
||||
|
||||
// 校验 DAG 信息
|
||||
PEWorkflowDAG dag = null;
|
||||
try {
|
||||
dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
// 校验
|
||||
if (!WorkflowDAGUtils.valid(dag)) {
|
||||
throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance);
|
||||
return newWfInstance.getWfInstanceId();
|
||||
}
|
||||
// 校验合法性(工作是否存在且启用)
|
||||
Set<Long> allJobIds = Sets.newHashSet();
|
||||
PEWorkflowDAG dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
dag.getNodes().forEach(node -> allJobIds.add(node.getJobId()));
|
||||
dag.getNodes().forEach(node -> {
|
||||
allJobIds.add(node.getJobId());
|
||||
// 将节点的初始状态置为等待派发
|
||||
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
|
||||
});
|
||||
int needNum = allJobIds.size();
|
||||
long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds);
|
||||
log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum);
|
||||
@ -107,16 +125,9 @@ public class WorkflowInstanceManager {
|
||||
log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance);
|
||||
} else {
|
||||
// 再次校验 DAG
|
||||
boolean valid = WorkflowDAGUtils.valid(dag);
|
||||
if (!valid) {
|
||||
log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance);
|
||||
} else {
|
||||
initNodeInfo(dag);
|
||||
// 再 set 一次,此时工作流中的节点信息已经完全初始化
|
||||
newWfInstance.setDag(JSON.toJSONString(dag));
|
||||
}
|
||||
initNodeInfo(dag);
|
||||
// 再 set 一次,此时工作流中的节点信息已经完全初始化
|
||||
newWfInstance.setDag(JSON.toJSONString(dag));
|
||||
workflowInstanceInfoRepository.saveAndFlush(newWfInstance);
|
||||
}
|
||||
return wfInstanceId;
|
||||
@ -129,6 +140,7 @@ public class WorkflowInstanceManager {
|
||||
* 1、工作流支持配置重复的任务节点
|
||||
* 2、移除参数 initParams,改为统一从工作流实例中获取
|
||||
* 传递工作流实例的 wfContext 作为 初始启动参数
|
||||
* 3、通过 {@link WorkflowDAGUtils#listReadyNodes} 兼容原地重试逻辑
|
||||
* ********************************************
|
||||
*
|
||||
* @param wfInfo 工作流任务信息
|
||||
@ -158,31 +170,36 @@ public class WorkflowInstanceManager {
|
||||
|
||||
try {
|
||||
// 从实例中读取工作流信息
|
||||
PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class);
|
||||
List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG);
|
||||
|
||||
peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()));
|
||||
PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class);
|
||||
// 根节点有可能被 disable
|
||||
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
|
||||
// 创建所有的根任务
|
||||
roots.forEach(root -> {
|
||||
readyNodes.forEach(readyNode -> {
|
||||
// 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示
|
||||
// instanceParam 传递的是工作流实例的 wfContext
|
||||
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), root.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||
root.setInstanceId(instanceId);
|
||||
root.setStatus(InstanceStatus.RUNNING.getV());
|
||||
Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||
readyNode.setInstanceId(instanceId);
|
||||
readyNode.setStatus(InstanceStatus.RUNNING.getV());
|
||||
|
||||
log.info("[Workflow-{}|{}] create root instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId(), instanceId);
|
||||
log.info("[Workflow-{}|{}] create readyNode instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), instanceId);
|
||||
});
|
||||
|
||||
// 持久化
|
||||
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
|
||||
wfInstanceInfo.setDag(JSON.toJSONString(peWorkflowDAG));
|
||||
wfInstanceInfo.setDag(JSON.toJSONString(dag));
|
||||
if (readyNodes.isEmpty()) {
|
||||
// 没有就绪的节点(所有节点都被禁用)
|
||||
wfInstanceInfo.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
|
||||
log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
|
||||
return;
|
||||
}
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
|
||||
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
|
||||
|
||||
// 真正开始执行根任务
|
||||
roots.forEach(this::runInstance);
|
||||
readyNodes.forEach(this::runInstance);
|
||||
} catch (Exception e) {
|
||||
|
||||
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
|
||||
onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo);
|
||||
}
|
||||
@ -195,6 +212,7 @@ public class WorkflowInstanceManager {
|
||||
* 1、工作流支持配置重复的任务节点
|
||||
* 2、不再获取上游任务的结果作为实例参数而是传递工作流
|
||||
* 实例的 wfContext 作为 实例参数
|
||||
* 3、通过 {@link WorkflowDAGUtils#listReadyNodes} 支持跳过禁用的节点
|
||||
* ********************************************
|
||||
*
|
||||
* @param wfInstanceId 工作流任务实例ID
|
||||
@ -225,9 +243,6 @@ public class WorkflowInstanceManager {
|
||||
|
||||
try {
|
||||
PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||
// 保存 nodeId -> Node 的映射关系
|
||||
Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
|
||||
|
||||
// 更新完成节点状态
|
||||
boolean allFinished = true;
|
||||
PEWorkflowDAG.Node instanceNode = null;
|
||||
@ -241,7 +256,6 @@ public class WorkflowInstanceManager {
|
||||
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
|
||||
allFinished = false;
|
||||
}
|
||||
nodeId2Node.put(node.getNodeId(), node);
|
||||
}
|
||||
if (instanceNode == null) {
|
||||
// DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略
|
||||
@ -259,7 +273,7 @@ public class WorkflowInstanceManager {
|
||||
}
|
||||
|
||||
// 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败
|
||||
if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode) ) {
|
||||
if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) {
|
||||
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance);
|
||||
return;
|
||||
@ -275,7 +289,12 @@ public class WorkflowInstanceManager {
|
||||
log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 注意:这里会直接跳过 disable 的节点
|
||||
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
|
||||
// 如果没有就绪的节点,需要再次判断是否已经全部完成
|
||||
if (readyNodes.isEmpty() && isFinish(dag)) {
|
||||
allFinished = true;
|
||||
}
|
||||
// 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了)
|
||||
if (allFinished) {
|
||||
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
|
||||
@ -288,36 +307,14 @@ public class WorkflowInstanceManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// 构建依赖树(下游任务需要哪些上游任务完成才能执行)
|
||||
Multimap<Long, Long> relyMap = LinkedListMultimap.create();
|
||||
dag.getEdges().forEach(edge -> relyMap.put(edge.getTo(), edge.getFrom()));
|
||||
|
||||
// 重新计算需要派发的任务
|
||||
List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
|
||||
relyMap.keySet().forEach(nodeId -> {
|
||||
PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId);
|
||||
// 跳过已完成节点(处理成功 或者 处理失败)和已派发节点(存在 InstanceId)
|
||||
if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV()
|
||||
|| currentNode.getStatus() == InstanceStatus.FAILED.getV()
|
||||
|| currentNode.getInstanceId() != null) {
|
||||
return;
|
||||
}
|
||||
// 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行
|
||||
for (Long reliedNodeId : relyMap.get(nodeId)) {
|
||||
// 注意,这里允许失败的原因是有允许失败跳过节点的存在,对于不允许跳过的失败节点,一定走不到这里(工作流会被打断)
|
||||
if (nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.SUCCEED.getV()
|
||||
&& nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.FAILED.getV()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
for (PEWorkflowDAG.Node readyNode : readyNodes) {
|
||||
// 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示
|
||||
// instanceParam 传递的是工作流实例的 wfContext
|
||||
Long newInstanceId = instanceService.create(currentNode.getJobId(), wfInstance.getAppId(), currentNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||
currentNode.setInstanceId(newInstanceId);
|
||||
currentNode.setStatus(InstanceStatus.RUNNING.getV());
|
||||
readyNodes.add(currentNode);
|
||||
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId(), newInstanceId);
|
||||
});
|
||||
Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||
readyNode.setInstanceId(newInstanceId);
|
||||
readyNode.setStatus(InstanceStatus.RUNNING.getV());
|
||||
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId);
|
||||
}
|
||||
|
||||
wfInstance.setDag(JSON.toJSONString(dag));
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
@ -372,7 +369,6 @@ public class WorkflowInstanceManager {
|
||||
/**
|
||||
* 初始化节点信息
|
||||
*
|
||||
*
|
||||
* @param dag pe dag
|
||||
* @since 20210205
|
||||
*/
|
||||
@ -418,6 +414,16 @@ public class WorkflowInstanceManager {
|
||||
dispatchService.dispatch(jobInfo, node.getInstanceId());
|
||||
}
|
||||
|
||||
|
||||
private boolean isFinish(PEWorkflowDAG dag){
|
||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) {
|
||||
|
||||
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
|
||||
@ -444,8 +450,4 @@ public class WorkflowInstanceManager {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) {
|
||||
// 默认不允许跳过
|
||||
return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed();
|
||||
}
|
||||
}
|
||||
|
@ -7,8 +7,11 @@ import com.github.kfcfans.powerjob.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
||||
import com.github.kfcfans.powerjob.common.response.WorkflowInstanceInfoDTO;
|
||||
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -18,6 +21,9 @@ import org.springframework.stereotype.Service;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils.isNotAllowSkipWhenFailed;
|
||||
|
||||
/**
|
||||
* 工作流实例服务
|
||||
@ -33,12 +39,17 @@ public class WorkflowInstanceService {
|
||||
private InstanceService instanceService;
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository wfInstanceInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
|
||||
/**
|
||||
* 停止工作流实例
|
||||
*
|
||||
* @param wfInstanceId 工作流实例ID
|
||||
* @param appId 所属应用ID
|
||||
* @param appId 所属应用ID
|
||||
*/
|
||||
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
@ -56,7 +67,7 @@ public class WorkflowInstanceService {
|
||||
|
||||
instanceService.stopInstance(node.getInstanceId());
|
||||
}
|
||||
}catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e);
|
||||
}
|
||||
});
|
||||
@ -70,6 +81,54 @@ public class WorkflowInstanceService {
|
||||
log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add by Echo009 on 2021/02/07
|
||||
*
|
||||
* @param wfInstanceId 工作流实例ID
|
||||
* @param appId 应用ID
|
||||
*/
|
||||
public void retryWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
// 仅允许重试 失败的工作流
|
||||
if (WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
throw new PowerJobException("workflow instance is running");
|
||||
}
|
||||
if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) {
|
||||
throw new PowerJobException("workflow instance is already successful");
|
||||
}
|
||||
// 因为 DAG 非法而终止的工作流实例无法重试
|
||||
PEWorkflowDAG dag = null;
|
||||
try {
|
||||
dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||
if (!WorkflowDAGUtils.valid(dag)) {
|
||||
throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!");
|
||||
}
|
||||
// 检查当前工作流信息
|
||||
Optional<WorkflowInfoDO> workflowInfo = workflowInfoRepository.findById(wfInstance.getWorkflowId());
|
||||
if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) {
|
||||
throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!");
|
||||
}
|
||||
// 将需要重试的节点状态重置(失败且不允许跳过的)
|
||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||
if (node.getStatus() == InstanceStatus.FAILED.getV()
|
||||
&& isNotAllowSkipWhenFailed(node)) {
|
||||
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null);
|
||||
}
|
||||
}
|
||||
wfInstance.setDag(JSON.toJSONString(dag));
|
||||
// 更新工作流实例状态,不覆盖实际触发时间
|
||||
wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
|
||||
wfInstance.setGmtModified(new Date());
|
||||
wfInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
// 立即开始
|
||||
workflowInstanceManager.start(workflowInfo.get(), wfInstanceId);
|
||||
}
|
||||
|
||||
|
||||
public WorkflowInstanceInfoDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
WorkflowInstanceInfoDTO dto = new WorkflowInstanceInfoDTO();
|
||||
|
@ -43,6 +43,12 @@ public class WorkflowInstanceController {
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@RequestMapping("/retry")
|
||||
public ResultDTO<Void> retryWfInstance(Long wfInstanceId, Long appId){
|
||||
workflowInstanceService.retryWorkflowInstance(wfInstanceId, appId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@GetMapping("/info")
|
||||
public ResultDTO<WorkflowInstanceInfoVO> getInfo(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstanceDO = workflowInstanceService.fetchWfInstance(wfInstanceId, appId);
|
||||
|
@ -1,14 +1,17 @@
|
||||
package com.github.kfcfans.powerjob.server.test;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
||||
import com.github.kfcfans.powerjob.server.model.WorkflowDAG;
|
||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
|
||||
import com.github.kfcfans.powerjob.server.model.WorkflowDAG;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* DAG 图算法测试集合
|
||||
@ -80,5 +83,213 @@ public class DAGTest {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @author Echo009
|
||||
* @since 2021/02/07
|
||||
*/
|
||||
@Test
|
||||
public void testListReadyNodes1() {
|
||||
// 双顶点
|
||||
// 1 -> 3
|
||||
// 2(x) -> 4 -> 5
|
||||
// 6(x) -> 7(x) -> 8(x) -> 4
|
||||
// 8(x) -> 9
|
||||
|
||||
List<PEWorkflowDAG.Node> nodes1 = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges1 = Lists.newLinkedList();
|
||||
|
||||
nodes1.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(6L, 6L, "6").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(7L, 7L, "7").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(8L, 8L, "8").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(9L, 9L, "9"));
|
||||
edges1.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(6L, 7L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(7L, 8L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(8L, 4L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(8L, 9L));
|
||||
|
||||
PEWorkflowDAG dag1 = new PEWorkflowDAG(nodes1, edges1);
|
||||
List<Long> readyNodeIds1 = WorkflowDAGUtils.listReadyNodes(dag1).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList());
|
||||
|
||||
System.out.println(readyNodeIds1);
|
||||
Assert.assertTrue(readyNodeIds1.contains(1L));
|
||||
Assert.assertTrue(readyNodeIds1.contains(4L));
|
||||
Assert.assertTrue(readyNodeIds1.contains(9L));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @author Echo009
|
||||
* @since 2021/02/07
|
||||
*/
|
||||
@Test
|
||||
public void testListReadyNodes2() {
|
||||
|
||||
// 注:(x) 代表 enable = false 的节点
|
||||
// 测试连续 move
|
||||
// 1(x) -> 2(x) -> 3 -> 4 -> 5(x) -> 6
|
||||
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
|
||||
PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges);
|
||||
List<Long> readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList());
|
||||
|
||||
System.out.println(readyNodeIds2);
|
||||
|
||||
Assert.assertEquals(1, readyNodeIds2.size());
|
||||
Assert.assertTrue(readyNodeIds2.contains(3L));
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @author Echo009
|
||||
* @since 2021/02/07
|
||||
*/
|
||||
@Test
|
||||
public void testListReadyNodes3() {
|
||||
|
||||
// 注:(x) 代表 enable = false 的节点
|
||||
// 复杂 move
|
||||
// 1(failed) -> 2(x) -> 4 -> 5(x) -> 6
|
||||
// 3(success) -> 4
|
||||
// 7 -> 6
|
||||
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setStatus(InstanceStatus.SUCCEED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7"));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
edges.add(new PEWorkflowDAG.Edge(7L, 6L));
|
||||
|
||||
PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges);
|
||||
List<Long> readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList());
|
||||
|
||||
System.out.println(readyNodeIds2);
|
||||
|
||||
Assert.assertEquals(2, readyNodeIds2.size());
|
||||
Assert.assertTrue(readyNodeIds2.contains(4L));
|
||||
Assert.assertTrue(readyNodeIds2.contains(7L));
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @author Echo009
|
||||
* @since 2021/02/07
|
||||
*/
|
||||
@Test
|
||||
public void testListReadyNodes4() {
|
||||
|
||||
// 注:(x) 代表 enable = false 的节点
|
||||
// 复杂 move
|
||||
// 1(failed) -> 2(x) -> 5 -> 6
|
||||
// 3(x) -> 5
|
||||
// 1(failed) -> 3(x) -> 4(x) -> 5
|
||||
// 4(x) -> 6
|
||||
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
|
||||
PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges);
|
||||
List<Long> readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList());
|
||||
|
||||
System.out.println(readyNodeIds2);
|
||||
|
||||
Assert.assertEquals(1, readyNodeIds2.size());
|
||||
Assert.assertTrue(readyNodeIds2.contains(5L));
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @author Echo009
|
||||
* @since 2021/02/07
|
||||
*/
|
||||
@Test
|
||||
public void testListReadyNodes5() {
|
||||
|
||||
// 注:(x) 代表 enable = false 的节点
|
||||
// 复杂 move
|
||||
// 1(failed) -> 2(x) -> 5 -> 6
|
||||
// 3(x) -> 5
|
||||
// 1(failed) -> 3(x) -> 4(x) -> 5
|
||||
// 4(x) -> 6
|
||||
// 4(x) -> 7
|
||||
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7"));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(4L, 7L));
|
||||
|
||||
PEWorkflowDAG dag = new PEWorkflowDAG(nodes, edges);
|
||||
List<Long> readyNodeIds2 = WorkflowDAGUtils.listReadyNodes(dag).stream().map(PEWorkflowDAG.Node::getNodeId).collect(Collectors.toList());
|
||||
|
||||
System.out.println(readyNodeIds2);
|
||||
|
||||
Assert.assertEquals(2, readyNodeIds2.size());
|
||||
Assert.assertTrue(readyNodeIds2.contains(5L));
|
||||
Assert.assertTrue(readyNodeIds2.contains(7L));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user