feat: support decision node #188

This commit is contained in:
Echo009 2021-12-23 14:13:32 +08:00
parent 8663f3b79f
commit c15cefc447
19 changed files with 902 additions and 119 deletions

View File

@ -15,9 +15,33 @@ public enum WorkflowNodeType {
/**
* 任务节点
*/
JOB(1);
JOB(1,false),
/**
* 判断节点
*/
DECISION(2,true),
/**
* 内嵌工作流
*/
NESTED_WORKFLOW(3,false),
;
private final int code;
/**
* 控制节点
*/
private final boolean controlNode;
public static WorkflowNodeType of(int code) {
for (WorkflowNodeType nodeType : values()) {
if (nodeType.code == code) {
return nodeType;
}
}
throw new IllegalArgumentException("unknown WorkflowNodeType of " + code);
}
}

View File

@ -7,6 +7,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import tech.powerjob.common.enums.WorkflowNodeType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -38,10 +39,10 @@ public class PEWorkflowDAG implements Serializable {
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public static class Node implements Serializable {
/**
* node id
*
* @since 20210128
*/
private Long nodeId;
@ -49,6 +50,8 @@ public class PEWorkflowDAG implements Serializable {
/**
* note type
*
* @see WorkflowNodeType
* @since 20210316
*/
private Integer nodeType;
@ -61,23 +64,41 @@ public class PEWorkflowDAG implements Serializable {
*/
private String nodeName;
@JsonSerialize(using= ToStringSerializer.class)
@JsonSerialize(using = ToStringSerializer.class)
private Long instanceId;
/**
* for decision node, it is JavaScript code
*/
private String nodeParams;
private Integer status;
/**
* for decision node, it only be can "true" or "false"
*/
private String result;
/**
* instanceId will be null if disable .
*/
private Boolean enable;
/**
* mark node which disable by control node.
*/
private Boolean disableByControlNode;
private Boolean skipWhenFailed;
private String startTime;
private String finishedTime;
public Node(Long nodeId) {
this.nodeId = nodeId;
this.nodeType = WorkflowNodeType.JOB.getCode();
}
public Node(Long nodeId, Integer nodeType) {
this.nodeId = nodeId;
this.nodeType = nodeType;
}
}
@ -86,10 +107,29 @@ public class PEWorkflowDAG implements Serializable {
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Edge implements Serializable {
private Long from;
private Long to;
/**
* property,support for complex flow control
* for decision node , it can be "true" or "false"
*/
private String property;
private Boolean enable;
public Edge(long from, long to) {
this.from = from;
this.to = to;
}
public Edge(long from, long to, String property) {
this.from = from;
this.to = to;
this.property = property;
}
}
public PEWorkflowDAG(@Nonnull List<Node> nodes, @Nullable List<Edge> edges) {

View File

@ -0,0 +1,18 @@
package tech.powerjob.server.core.evaluator;
/**
* @author Echo009
* @since 2021/12/10
*/
public interface Evaluator {
/**
* 使用给定输入计算表达式
*
* @param expression 可执行的表达式
* @param input 输入
* @return 计算结果
*/
Object evaluate(String expression, Object input);
}

View File

@ -0,0 +1,30 @@
package tech.powerjob.server.core.evaluator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
/**
* @author Echo009
* @since 2021/12/10
*/
@Slf4j
@Component
public class JavaScriptEvaluator implements Evaluator {
private static final ScriptEngine ENGINE = new ScriptEngineManager().getEngineByName("nashorn");
@Override
@SneakyThrows
public Object evaluate(String expression, Object input) {
Bindings bindings = ENGINE.createBindings();
bindings.put("context", input);
return ENGINE.eval(expression, bindings);
}
}

View File

@ -78,7 +78,7 @@ public class InstanceService {
* @param expectTriggerTime 预期执行时间
* @return 任务实例ID
*/
public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
Long instanceId = idGenerateService.allocate();
Date now = new Date();

View File

@ -0,0 +1,101 @@
package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.core.workflow.hanlder.WorkflowNodeHandlerMarker;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
/**
* @author Echo009
* @since 2021/12/9
*/
@Slf4j
@Service
public class WorkflowNodeHandleService {
private final Map<WorkflowNodeType, ControlNodeHandler> controlNodeHandlerContainer;
private final Map<WorkflowNodeType, TaskNodeHandler> taskNodeHandlerContainer;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
public WorkflowNodeHandleService(List<ControlNodeHandler> controlNodeHandlerList, List<TaskNodeHandler> taskNodeHandlerList, WorkflowInstanceInfoRepository workflowInstanceInfoRepository) {
// init
controlNodeHandlerContainer = new EnumMap<>(WorkflowNodeType.class);
taskNodeHandlerContainer = new EnumMap<>(WorkflowNodeType.class);
controlNodeHandlerList.forEach(controlNodeHandler -> controlNodeHandlerContainer.put(controlNodeHandler.matchingType(), controlNodeHandler));
taskNodeHandlerList.forEach(taskNodeHandler -> taskNodeHandlerContainer.put(taskNodeHandler.matchingType(), taskNodeHandler));
//
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
}
/**
* 处理任务节点
* 注意上层调用方必须保证这里的 taskNodeList 不能为空
*/
public void handleTaskNodes(List<PEWorkflowDAG.Node> taskNodeList, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// 创建任务实例
taskNodeList.forEach(taskNode -> {
// 注意这里必须保证任务实例全部创建成功如果在这里创建实例部分失败会导致 DAG 信息不会更新已经生成的实例节点在工作流日志中没法展示
TaskNodeHandler taskNodeHandler = (TaskNodeHandler) findMatchingHandler(taskNode);
taskNodeHandler.createTaskInstance(taskNode, dag, wfInstanceInfo);
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={})", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), taskNode.getNodeId(), taskNode.getJobId());
});
// 持久化工作流实例信息
wfInstanceInfo.setDag(JSON.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
// 启动
taskNodeList.forEach(taskNode -> {
TaskNodeHandler taskNodeHandler = (TaskNodeHandler) findMatchingHandler(taskNode);
taskNodeHandler.startTaskInstance(taskNode);
});
}
/**
* 处理控制节点
* 注意上层调用方必须保证这里的 controlNodeList 不能为空
*/
public void handleControlNodes(List<PEWorkflowDAG.Node> controlNodeList, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
for (PEWorkflowDAG.Node node : controlNodeList) {
handleControlNode(node, dag, wfInstanceInfo);
}
}
public void handleControlNode(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
ControlNodeHandler controlNodeHandler = (ControlNodeHandler) findMatchingHandler(node);
node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis()));
controlNodeHandler.handle(node, dag, wfInstanceInfo);
node.setFinishedTime(CommonUtils.formatTime(System.currentTimeMillis()));
}
private WorkflowNodeHandlerMarker findMatchingHandler(PEWorkflowDAG.Node node) {
WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType());
WorkflowNodeHandlerMarker res;
if (!nodeType.isControlNode()) {
res = taskNodeHandlerContainer.get(nodeType);
} else {
res = controlNodeHandlerContainer.get(nodeType);
}
if (res == null) {
// impossible
throw new UnsupportedOperationException("unsupported node type : " + nodeType);
}
return res;
}
}

View File

@ -22,6 +22,7 @@ import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.lock.UseSegmentLock;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
import tech.powerjob.server.core.uid.IdGenerateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.extension.defaultimpl.alram.AlarmCenter;
@ -34,6 +35,7 @@ import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoReposi
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNotAllowSkipWhenFailed;
@ -67,6 +69,8 @@ public class WorkflowInstanceManager {
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Resource
private WorkflowNodeHandleService workflowNodeHandleService;
/**
* 创建工作流任务实例
@ -127,14 +131,14 @@ public class WorkflowInstanceManager {
*/
private void initNodeInfo(PEWorkflowDAG dag) {
for (PEWorkflowDAG.Node node : dag.getNodes()) {
WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE));
WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE));
// 任务节点需初始化 是否启用是否允许失败跳过节点参数 等信息
if (workflowNodeInfo.getType() == null || workflowNodeInfo.getType() == WorkflowNodeType.JOB.getCode()) {
// 任务节点缺失任务信息
if (workflowNodeInfo.getJobId() == null) {
throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE);
}
JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB));
JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB));
node.setNodeType(WorkflowNodeType.JOB.getCode());
// 初始化任务相关信息
@ -216,46 +220,40 @@ public class WorkflowInstanceManager {
return;
}
}
try {
// 从实例中读取工作流信息
PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class);
// 根节点有可能被 disable
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
// 创建所有的根任务
readyNodes.forEach(readyNode -> {
// 注意这里必须保证任务实例全部创建成功如果在这里创建实例部分失败会导致 DAG 信息不会更新已经生成的实例节点在工作流日志中没法展示
// instanceParam 传递的是工作流实例的 wfContext
Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
readyNode.setInstanceId(instanceId);
readyNode.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create readyNode instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), instanceId);
});
// 持久化
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
wfInstanceInfo.setDag(JSON.toJSONString(dag));
// 先处理其中的控制节点
List<PEWorkflowDAG.Node> controlNodes = findControlNodes(readyNodes);
while (!controlNodes.isEmpty()) {
workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstanceInfo);
readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
controlNodes = findControlNodes(readyNodes);
}
if (readyNodes.isEmpty()) {
// 没有就绪的节点所有节点都被禁用
wfInstanceInfo.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
wfInstanceInfo.setResult(SystemInstanceResult.NO_ENABLED_NODES);
wfInstanceInfo.setFinishedTime(System.currentTimeMillis());
wfInstanceInfo.setDag(JSON.toJSONString(dag));
log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
return;
}
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
// 需要更新工作流实例状态
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
// 处理任务节点
workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstanceInfo);
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务
readyNodes.forEach(this::runInstance);
} catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo);
}
}
/**
* 下一步当工作流的某个任务完成时调用该方法
* ********************************************
@ -313,14 +311,14 @@ public class WorkflowInstanceManager {
wfInstance.setGmtModified(new Date());
wfInstance.setDag(JSON.toJSONString(dag));
// 工作流已经结束某个节点失败导致工作流整体已经失败仅更新最新的DAG图
// 工作流已经结束某个节点失败导致工作流整体已经失败仅更新最新的 DAG
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus());
return;
}
// 任务失败 && 不允许失败跳过DAG流程被打断整体失败
// 任务失败 && 不允许失败跳过DAG 流程被打断整体失败
if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) {
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance);
@ -329,55 +327,53 @@ public class WorkflowInstanceManager {
// 子任务被手动停止
if (status == InstanceStatus.STOPPED) {
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_STOPPED);
wfInstance.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
updateWorkflowInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED);
log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId);
return;
}
// 注意这里会直接跳过 disable 的节点
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
// 这里得重新更新一下因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态
wfInstance.setDag(JSON.toJSONString(dag));
// 如果没有就绪的节点需要再次判断是否已经全部完成
if (readyNodes.isEmpty() && isFinish(dag)) {
allFinished = true;
}
// 工作流执行完毕能执行到这里代表该工作流内所有子任务都执行成功了
if (allFinished) {
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
// 这里得重新更新一下因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态
wfInstance.setDag(JSON.toJSONString(dag));
// 最终任务的结果作为整个 workflow 的结果
wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
return;
}
for (PEWorkflowDAG.Node readyNode : readyNodes) {
// 同理这里必须保证任务实例全部创建成功避免部分失败导致已经生成的实例节点在工作流日志中没法展示
// instanceParam 传递的是工作流实例的 wfContext
Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
readyNode.setInstanceId(newInstanceId);
readyNode.setStatus(InstanceStatus.RUNNING.getV());
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId);
// 先处理其中的控制节点
List<PEWorkflowDAG.Node> controlNodes = findControlNodes(readyNodes);
while (!controlNodes.isEmpty()) {
workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstance);
readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
controlNodes = findControlNodes(readyNodes);
}
// 这里也得更新 DAG 信息
wfInstance.setDag(JSON.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
// 持久化结束后开始调度执行所有的任务
readyNodes.forEach(this::runInstance);
// 再次判断是否已完成 允许控制节点出现在末尾
if (readyNodes.isEmpty()) {
if (isFinish(dag)) {
wfInstance.setDag(JSON.toJSONString(dag));
updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
return;
}
// 没有就绪的节点 还没执行完成仅更新 DAG
wfInstance.setDag(JSON.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
return;
}
// 处理任务节点
workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstance);
} catch (Exception e) {
onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance);
log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e);
}
}
/**
* 更新工作流上下文
* fix : 得和其他操作工作流实例的方法用同一把锁才行不然有并发问题会导致节点状态被覆盖
@ -412,21 +408,21 @@ public class WorkflowInstanceManager {
}
/**
* 运行任务实例
* 需要将创建和运行任务实例分离否则在秒失败情况下会发生DAG覆盖更新的问题
*
* @param node 节点信息
*/
private void runInstance(PEWorkflowDAG.Node node) {
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
// 洗去时间表达式类型
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
dispatchService.dispatch(jobInfo, node.getInstanceId());
private void updateWorkflowInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) {
wfInstance.setStatus(workflowInstanceStatus.getV());
wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
}
private List<PEWorkflowDAG.Node> findControlNodes(List<PEWorkflowDAG.Node> readyNodes) {
return readyNodes.stream().filter(node -> {
WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType());
return nodeType.isControlNode();
}).collect(Collectors.toList());
}
private boolean isFinish(PEWorkflowDAG dag) {
for (PEWorkflowDAG.Node node : dag.getNodes()) {
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {

View File

@ -1,19 +1,23 @@
package tech.powerjob.server.core.workflow.algorithm;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.*;
import tech.powerjob.common.model.PEWorkflowDAG;
import java.util.List;
import java.util.Map;
/**
* DAG 工作流对象
* 使用引用易于计算不再参与主运算起辅助作用
* 节点中均记录了上游以及下游的连接关系(无法使用 JSON 来序列化以及反序列化)
*
* @author tjq
* @author Echo009
* @since 2020/5/26
*/
@Data
@ToString(exclude = {"nodeMap"})
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowDAG {
@ -23,22 +27,31 @@ public class WorkflowDAG {
*/
private List<Node> roots;
@Data
private Map<Long, Node> nodeMap;
public Node getNode(Long nodeId) {
if (nodeMap == null) {
return null;
}
return nodeMap.get(nodeId);
}
@Getter
@Setter
@EqualsAndHashCode(exclude = {"dependencies", "dependenceEdgeMap", "successorEdgeMap", "holder","successors"})
@ToString(exclude = {"dependencies", "dependenceEdgeMap", "successorEdgeMap", "holder"})
@NoArgsConstructor
public static final class Node {
public Node(List<Node> successors, Long nodeId, Long jobId, String jobName, int status) {
this.successors = successors;
this.nodeId = nodeId;
this.jobId = jobId;
this.jobName = jobName;
this.status = status;
public Node(PEWorkflowDAG.Node node) {
this.nodeId = node.getNodeId();
this.holder = node;
this.dependencies = Lists.newLinkedList();
this.dependenceEdgeMap = Maps.newHashMap();
this.successors = Lists.newLinkedList();
this.successorEdgeMap = Maps.newHashMap();
}
/**
* 后继者子节点
*/
private List<Node> successors;
/**
* node id
*
@ -46,24 +59,23 @@ public class WorkflowDAG {
*/
private Long nodeId;
private Long jobId;
private String jobName;
private PEWorkflowDAG.Node holder;
/**
* 运行时信息
* 依赖的上游节点
*/
private Long instanceId;
private List<Node> dependencies;
/**
* 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED
* 连接依赖节点的边
*/
private int status;
private String result;
private Map<Node, PEWorkflowDAG.Edge> dependenceEdgeMap;
/**
* instanceId will be null if disable .
* 后继者子节点
*/
private Boolean enable;
private List<Node> successors;
/**
* 连接后继节点的边
*/
private Map<Node, PEWorkflowDAG.Edge> successorEdgeMap;
private Boolean skipWhenFailed;
}
}

View File

@ -1,11 +1,11 @@
package tech.powerjob.server.core.workflow.algorithm;
import com.google.common.collect.*;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import com.google.common.collect.*;
import java.util.*;
@ -60,7 +60,7 @@ public class WorkflowDAGUtils {
WorkflowDAG dag = convert(peWorkflowDAG);
// 检查所有顶点的路径
for (WorkflowDAG.Node root : dag.getRoots()) {
if (invalidPath(root, Sets.newHashSet(),traversalNodeIds)) {
if (invalidPath(root, Sets.newHashSet(), traversalNodeIds)) {
return false;
}
}
@ -195,6 +195,56 @@ public class WorkflowDAGUtils {
// 默认不允许跳过
return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed();
}
/**
* 处理被 disable 掉的边
* 1. 将仅能通过被 disable 掉的边可达的节点标记为 disabledisableByControlNode将状态更新为已取消
* 2. 将这些被 disable 掉的节点的出口边都标记为 disable
* 3. 递归调用自身继续处理被 disable 的边
*/
@SuppressWarnings("squid:S3776")
public static void handleDisableEdges(List<PEWorkflowDAG.Edge> disableEdges, WorkflowDAG dag) {
if (disableEdges.isEmpty()) {
return;
}
List<PEWorkflowDAG.Node> disableNodes = Lists.newArrayList();
// 处理边上的节点如果该节点仅能通过被 disable 掉的边可达那么将该节点标记为 disable disableByControlNode 并且将状态更新为 已取消
for (PEWorkflowDAG.Edge disableEdge : disableEdges) {
WorkflowDAG.Node toNode = dag.getNode(disableEdge.getTo());
// 判断是否仅能通过被 disable 掉的边可达
Collection<PEWorkflowDAG.Edge> dependenceEdges = toNode.getDependenceEdgeMap().values();
boolean shouldBeDisable = true;
for (PEWorkflowDAG.Edge dependenceEdge : dependenceEdges) {
if (dependenceEdge.getEnable() == null || dependenceEdge.getEnable()) {
shouldBeDisable = false;
break;
}
}
if (shouldBeDisable) {
// disable
PEWorkflowDAG.Node node = toNode.getHolder();
node.setEnable(false)
.setDisableByControlNode(true)
.setStatus(InstanceStatus.CANCELED.getV());
disableNodes.add(node);
}
}
if (!disableNodes.isEmpty()) {
// disable 掉的节点的出口边都会被标记为 disable
List<PEWorkflowDAG.Edge> targetEdges = Lists.newArrayList();
for (PEWorkflowDAG.Node disableNode : disableNodes) {
WorkflowDAG.Node node = dag.getNode(disableNode.getNodeId());
Collection<PEWorkflowDAG.Edge> edges = node.getSuccessorEdgeMap().values();
for (PEWorkflowDAG.Edge edge : edges) {
edge.setEnable(false);
targetEdges.add(edge);
}
}
// 广度优先 继续处理被 disable 掉的边
handleDisableEdges(targetEdges, dag);
}
}
/**
* 将点线表示法的DAG图转化为引用表达法的DAG图
*
@ -212,9 +262,8 @@ public class WorkflowDAGUtils {
// 创建节点
peWorkflowDAG.getNodes().forEach(node -> {
Long nodeId = node.getNodeId();
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getNodeName(), InstanceStatus.WAITING_DISPATCH.getV());
WorkflowDAG.Node n = new WorkflowDAG.Node(node);
id2Node.put(nodeId, n);
// 初始阶段每一个点都设为顶点
rootIds.add(nodeId);
});
@ -229,7 +278,9 @@ public class WorkflowDAGUtils {
}
from.getSuccessors().add(to);
from.getSuccessorEdgeMap().put(to, edge);
to.getDependencies().add(from);
to.getDependenceEdgeMap().put(from, edge);
// 被连接的点不可能成为 root移除
rootIds.remove(to.getNodeId());
});
@ -241,7 +292,7 @@ public class WorkflowDAGUtils {
List<WorkflowDAG.Node> roots = Lists.newLinkedList();
rootIds.forEach(id -> roots.add(id2Node.get(id)));
return new WorkflowDAG(roots);
return new WorkflowDAG(roots, id2Node);
}
@ -257,7 +308,7 @@ public class WorkflowDAGUtils {
}
ids.add(root.getNodeId());
for (WorkflowDAG.Node node : root.getSuccessors()) {
if (invalidPath(node, Sets.newHashSet(ids),nodeIdContainer)) {
if (invalidPath(node, Sets.newHashSet(ids), nodeIdContainer)) {
return true;
}
}

View File

@ -0,0 +1,22 @@
package tech.powerjob.server.core.workflow.hanlder;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
/**
* @author Echo009
* @since 2021/12/9
*/
public interface ControlNodeHandler extends WorkflowNodeHandlerMarker {
/**
* 处理控制节点
*
* @param node 需要被处理的目标节点
* @param dag 节点所属 DAG
* @param wfInstanceInfo 节点所属工作流实例
*/
void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo);
}

View File

@ -0,0 +1,29 @@
package tech.powerjob.server.core.workflow.hanlder;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
/**
* @author Echo009
* @since 2021/12/9
*/
public interface TaskNodeHandler extends WorkflowNodeHandlerMarker {
/**
* 创建任务实例
*
* @param node 目标节点
* @param dag DAG
* @param wfInstanceInfo 工作流实例
*/
void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo);
/**
* 执行任务实例
*
* @param node 目标节点
*/
void startTaskInstance(PEWorkflowDAG.Node node);
}

View File

@ -0,0 +1,20 @@
package tech.powerjob.server.core.workflow.hanlder;
import tech.powerjob.common.enums.WorkflowNodeType;
/**
* @author Echo009
* @since 2021/12/9
*/
public interface WorkflowNodeHandlerMarker {
/**
* 返回能够处理的节点类型
* @return matching node type
*/
WorkflowNodeType matchingType();
}

View File

@ -0,0 +1,98 @@
package tech.powerjob.server.core.workflow.hanlder.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.core.evaluator.JavaScriptEvaluator;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import java.util.*;
/**
* @author Echo009
* @since 2021/12/9
*/
@Slf4j
@Component
public class DecisionNodeHandler implements ControlNodeHandler {
private final JavaScriptEvaluator javaScriptEvaluator = new JavaScriptEvaluator();
/**
* 处理判断节点
* 1. 执行脚本
* 2. 根据返回值 disable 掉相应的边以及节点
*/
@Override
public void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
String script = node.getNodeParams();
if (StringUtils.isBlank(script)) {
log.error("[Workflow-{}|{}]decision node's param is blank! nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId());
throw new PowerJobException("decision node's param is blank!");
}
// wfContext must be a map
HashMap<String, String> wfContext = JSON.parseObject(wfInstanceInfo.getWfContext(), new TypeReference<HashMap<String, String>>() {
});
Object result;
try {
result = javaScriptEvaluator.evaluate(script, wfContext);
} catch (Exception e) {
log.error("[Workflow-{}|{}]failed to evaluate decision node,nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), e);
throw new PowerJobException("can't evaluate decision node!");
}
boolean finalRes;
if (result instanceof Boolean) {
finalRes = ((Boolean) result);
} else if (result instanceof Number) {
finalRes = ((Number) result).doubleValue() > 0;
} else {
log.error("[Workflow-{}|{}]decision node's return value is illegal,nodeId:{},result:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), JsonUtils.toJSONString(result));
throw new PowerJobException("decision node's return value is illegal!");
}
handleDag(finalRes, node, dag);
}
private void handleDag(boolean res, PEWorkflowDAG.Node node, PEWorkflowDAG peDag) {
// 更新判断节点的状态为成功
node.setResult(String.valueOf(res));
node.setStatus(InstanceStatus.SUCCEED.getV());
WorkflowDAG dag = WorkflowDAGUtils.convert(peDag);
// 根据节点的计算结果将相应的边 disable
WorkflowDAG.Node targetNode = dag.getNode(node.getNodeId());
Collection<PEWorkflowDAG.Edge> edges = targetNode.getSuccessorEdgeMap().values();
if (edges.isEmpty()) {
return;
}
List<PEWorkflowDAG.Edge> disableEdges = new ArrayList<>(edges.size());
for (PEWorkflowDAG.Edge edge : edges) {
// 这里一定不会出现异常
boolean property = Boolean.parseBoolean(edge.getProperty());
if (res != property) {
// disable
edge.setEnable(false);
disableEdges.add(edge);
}
}
WorkflowDAGUtils.handleDisableEdges(disableEdges,dag);
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.DECISION;
}
}

View File

@ -0,0 +1,56 @@
package tech.powerjob.server.core.workflow.hanlder.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import javax.annotation.Resource;
/**
* @author Echo009
* @since 2021/12/9
*/
@Slf4j
@Component
public class JobNodeHandler implements TaskNodeHandler {
@Resource
private InstanceService instanceService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private DispatchService dispatchService;
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// instanceParam 传递的是工作流实例的 wfContext
Long instanceId = instanceService.create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis());
node.setInstanceId(instanceId);
node.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId);
}
@Override
public void startTaskInstance(PEWorkflowDAG.Node node) {
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
// 洗去时间表达式类型
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
dispatchService.dispatch(jobInfo, node.getInstanceId());
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.JOB;
}
}

View File

@ -40,6 +40,7 @@ public class WorkflowNodeInfoDO {
private Integer type;
/**
* 任务 ID
* 对于嵌套工作流类型的节点而言这里存储是工作流 ID
*/
private Long jobId;
/**

View File

@ -0,0 +1,32 @@
package tech.powerjob.server.core.data;
import com.google.common.collect.Lists;
import tech.powerjob.common.model.PEWorkflowDAG;
import java.util.List;
/**
* @author Echo009
* @since 2021/12/10
*/
public class DataConstructUtil {
public static void addNodes(PEWorkflowDAG dag, PEWorkflowDAG.Node... nodes) {
for (PEWorkflowDAG.Node node : nodes) {
dag.getNodes().add(node);
}
}
public static void addEdges(PEWorkflowDAG dag, PEWorkflowDAG.Edge... edges) {
for (PEWorkflowDAG.Edge edge : edges) {
dag.getEdges().add(edge);
}
}
public static PEWorkflowDAG constructEmptyDAG() {
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
return new PEWorkflowDAG(nodes, edges);
}
}

View File

@ -0,0 +1,99 @@
package tech.powerjob.server.core.evaluator;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.junit.Assert;
import org.junit.Test;
import tech.powerjob.common.serialize.JsonUtils;
import java.util.HashMap;
/**
* @author Echo009
* @since 2021/12/10
*/
public class JavaScriptEvaluatorTest {
private final JavaScriptEvaluator javaScriptEvaluator = new JavaScriptEvaluator();
private final HashMap<String, String> SIMPLE_CONTEXT = new HashMap<>();
private final HashMap<String, String> COMPLEX_CONTEXT = new HashMap<>();
{
// simple context
// {"k1":"1","k2":"\"2\"","k3":"false","k4":"1.1"}
SIMPLE_CONTEXT.put("k1", JsonUtils.toJSONString(1));
SIMPLE_CONTEXT.put("k2", JsonUtils.toJSONString("2"));
SIMPLE_CONTEXT.put("k3", JsonUtils.toJSONString(false));
SIMPLE_CONTEXT.put("k4", JsonUtils.toJSONString(1.1d));
// complex context
// {"array":"[1,2,3,4,5]","obj":"{\"id\":\"e3\",\"value\":3,\"sub\":{\"id\":\"e2\",\"value\":2,\"sub\":{\"id\":\"e1\",\"value\":1,\"sub\":null}}}","map":"{\"e1\":{\"id\":\"e1\",\"value\":1,\"sub\":null}}"}
COMPLEX_CONTEXT.put("array", JsonUtils.toJSONString(new int[]{1, 2, 3, 4, 5}));
Element e1 = new Element("e1",1,null);
Element e2 = new Element("e2",2,e1);
Element e3 = new Element("e3",3,e2);
COMPLEX_CONTEXT.put("obj", JsonUtils.toJSONString(e3));
HashMap<String, Object> map = new HashMap<>();
map.put("e1",e1);
COMPLEX_CONTEXT.put("map",JsonUtils.toJSONString(map));
}
@Test
public void testSimpleEval1() {
Object res = javaScriptEvaluator.evaluate("var x = false; x;", null);
Assert.assertEquals(false, res);
}
@Test
public void testSimpleEval2() {
Object res = javaScriptEvaluator.evaluate("var person = {name:'echo',tag:'y'}; person.name;", null);
Assert.assertEquals("echo", res);
}
@Test
public void testSimpleEval3() {
// inject simple context
Object res = javaScriptEvaluator.evaluate("var res = context.k3; res;", SIMPLE_CONTEXT);
Boolean s = JsonUtils.parseObjectUnsafe(res.toString(), Boolean.class);
Assert.assertEquals(false, s);
}
@Test
public void testSimpleEval4() {
Object res = javaScriptEvaluator.evaluate("var res = JSON.parse(context.k3); res == false;", SIMPLE_CONTEXT);
Assert.assertEquals(true, res);
}
@Test
public void testComplexEval1() {
// array
Object res = javaScriptEvaluator.evaluate("var res = JSON.parse(context.array); res[0] == 1;", COMPLEX_CONTEXT);
Assert.assertEquals(true, res);
// map
res = javaScriptEvaluator.evaluate("var map = JSON.parse(context.map); var e1 = map.e1; e1.value ",COMPLEX_CONTEXT);
Assert.assertEquals(1,res);
// object
res = javaScriptEvaluator.evaluate("var e3 = JSON.parse(context.obj); var e1 = e3.sub.sub; e1.value ",COMPLEX_CONTEXT);
Assert.assertEquals(1,res);
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class Element {
private String id;
private Integer value;
private Element sub;
}
}

View File

@ -0,0 +1,165 @@
package tech.powerjob.server.core.workflow.hanlder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.hanlder.impl.DecisionNodeHandler;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import static tech.powerjob.server.core.data.DataConstructUtil.*;
/**
* @author Echo009
* @since 2021/12/9
* <p>
* 如有变动请同步变更文档
* https://www.yuque.com/powerjob/dev/bgw03h/edit?toc_node_uuid=V9igz9SZ30lF59bX
*/
class DecisionNodeHandlerTest {
private final DecisionNodeHandler decisionNodeHandler = new DecisionNodeHandler();
@Test
void testCase1() {
PEWorkflowDAG peWorkflowDAG = constructEmptyDAG();
PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node1.setNodeParams("true;");
PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L);
PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L);
PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L);
addNodes(peWorkflowDAG, node1, node2, node3, node4);
PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "false");
PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "true");
PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L);
addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4);
decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO());
Assertions.assertEquals(false, node2.getEnable());
Assertions.assertEquals(true, node2.getDisableByControlNode());
Assertions.assertEquals(false, node4.getEnable());
Assertions.assertEquals(true, node4.getDisableByControlNode());
//
Assertions.assertEquals(false, edge1_2.getEnable());
Assertions.assertEquals(false, edge2_4.getEnable());
}
@Test
void testCase2() {
PEWorkflowDAG peWorkflowDAG = constructEmptyDAG();
PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node1.setNodeParams("true;");
PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L);
PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L);
PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L);
PEWorkflowDAG.Node node5 = new PEWorkflowDAG.Node(5L);
addNodes(peWorkflowDAG, node1, node2, node3, node4, node5);
PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "false");
PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "true");
PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L);
PEWorkflowDAG.Edge edge2_5 = new PEWorkflowDAG.Edge(2L, 5L);
PEWorkflowDAG.Edge edge3_5 = new PEWorkflowDAG.Edge(3L, 5L);
addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4, edge2_5, edge3_5);
decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO());
Assertions.assertEquals(false, node2.getEnable());
Assertions.assertEquals(true, node2.getDisableByControlNode());
Assertions.assertEquals(false, node4.getEnable());
Assertions.assertEquals(true, node4.getDisableByControlNode());
//
Assertions.assertEquals(false, edge1_2.getEnable());
Assertions.assertEquals(false, edge2_4.getEnable());
Assertions.assertEquals(false, edge2_5.getEnable());
}
@Test
void testCase3() {
PEWorkflowDAG peWorkflowDAG = constructEmptyDAG();
PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node1.setNodeParams("true;");
PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node2.setNodeParams("true;");
PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L);
PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L);
PEWorkflowDAG.Node node5 = new PEWorkflowDAG.Node(5L);
addNodes(peWorkflowDAG, node1, node2, node3, node4, node5);
PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "true");
PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "false");
PEWorkflowDAG.Edge edge2_5 = new PEWorkflowDAG.Edge(2L, 5L, "false");
PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L, "true");
PEWorkflowDAG.Edge edge3_5 = new PEWorkflowDAG.Edge(3L, 5L);
addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4, edge2_5, edge3_5);
// 处理第一个判断节点后
decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO());
Assertions.assertEquals(false, node3.getEnable());
Assertions.assertEquals(true, node3.getDisableByControlNode());
//
Assertions.assertEquals(false, edge1_3.getEnable());
Assertions.assertEquals(false, edge3_5.getEnable());
Assertions.assertNull(edge2_5.getEnable());
// 节点 5 还是初始状态
Assertions.assertNull(node5.getEnable());
// 处理第二个判断节点
decisionNodeHandler.handle(node2, peWorkflowDAG, new WorkflowInstanceInfoDO());
// 节点 5 disable
Assertions.assertFalse(node5.getEnable());
Assertions.assertFalse(edge2_5.getEnable());
}
@Test
void testCase4() {
PEWorkflowDAG peWorkflowDAG = constructEmptyDAG();
PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node1.setNodeParams("true;");
PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node2.setNodeParams("true;");
PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L);
PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L);
PEWorkflowDAG.Node node5 = new PEWorkflowDAG.Node(5L);
addNodes(peWorkflowDAG, node1, node2, node3, node4, node5);
PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "true");
PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "false");
PEWorkflowDAG.Edge edge2_5 = new PEWorkflowDAG.Edge(2L, 5L, "true");
PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L, "false");
PEWorkflowDAG.Edge edge3_5 = new PEWorkflowDAG.Edge(3L, 5L);
addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4, edge2_5, edge3_5);
// 处理第一个判断节点后
decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO());
Assertions.assertEquals(false, node3.getEnable());
Assertions.assertEquals(true, node3.getDisableByControlNode());
//
Assertions.assertEquals(false, edge1_3.getEnable());
Assertions.assertEquals(false, edge3_5.getEnable());
Assertions.assertNull(edge2_5.getEnable());
// 节点 5 还是初始状态
Assertions.assertNull(node5.getEnable());
// 处理第二个判断节点
decisionNodeHandler.handle(node2, peWorkflowDAG, new WorkflowInstanceInfoDO());
// 节点 5 还是初始状态
Assertions.assertNull(node5.getEnable());
Assertions.assertFalse(node4.getEnable());
Assertions.assertTrue(node4.getDisableByControlNode());
Assertions.assertFalse(edge2_4.getEnable());
}
}

View File

@ -1,10 +1,8 @@
package tech.powerjob.server.test;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import com.google.common.collect.Lists;
@ -56,19 +54,11 @@ public class DAGTest {
Assert.assertTrue(WorkflowDAGUtils.valid(validPEDAG));
WorkflowDAG wfDAG = WorkflowDAGUtils.convert(validPEDAG);
System.out.println("jackson");
System.out.println(JsonUtils.toJSONString(wfDAG));
// Jackson 不知道怎么序列化引用只能放弃使用 FastJSON 序列化引用 $ref
WorkflowDAG wfDAGByJackSon = JsonUtils.parseObject(JsonUtils.toJSONString(wfDAG), WorkflowDAG.class);
System.out.println("fastJson");
System.out.println(JSONObject.toJSONString(wfDAG));
WorkflowDAG wfDAGByFastJSON = JSONObject.parseObject(JSONObject.toJSONString(wfDAG), WorkflowDAG.class);
// 打断点看 reference 关系
System.out.println(wfDAGByJackSon);
System.out.println(wfDAGByFastJSON);
Assert.assertEquals(1, wfDAG.getRoots().size());
WorkflowDAG.Node node = wfDAG.getNode(3L);
Assert.assertEquals(1, (long) node.getDependencies().get(0).getNodeId());
Assert.assertEquals(4, (long) node.getSuccessors().get(0).getNodeId());
}
@Test
@ -151,7 +141,6 @@ public class DAGTest {
}
/**
* @author Echo009
* @since 2021/02/07