From c15cefc447efb09f8f9aacc997e7ebce6ba733e8 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Thu, 23 Dec 2021 14:13:32 +0800 Subject: [PATCH] feat: support decision node #188 --- .../common/enums/WorkflowNodeType.java | 26 ++- .../powerjob/common/model/PEWorkflowDAG.java | 50 +++++- .../server/core/evaluator/Evaluator.java | 18 ++ .../core/evaluator/JavaScriptEvaluator.java | 30 ++++ .../server/core/instance/InstanceService.java | 2 +- .../service/WorkflowNodeHandleService.java | 101 +++++++++++ .../workflow/WorkflowInstanceManager.java | 120 ++++++------- .../core/workflow/algorithm/WorkflowDAG.java | 66 ++++--- .../workflow/algorithm/WorkflowDAGUtils.java | 67 ++++++- .../workflow/hanlder/ControlNodeHandler.java | 22 +++ .../workflow/hanlder/TaskNodeHandler.java | 29 +++ .../hanlder/WorkflowNodeHandlerMarker.java | 20 +++ .../hanlder/impl/DecisionNodeHandler.java | 98 +++++++++++ .../workflow/hanlder/impl/JobNodeHandler.java | 56 ++++++ .../remote/model/WorkflowNodeInfoDO.java | 1 + .../server/core/data/DataConstructUtil.java | 32 ++++ .../evaluator/JavaScriptEvaluatorTest.java | 99 +++++++++++ .../hanlder/DecisionNodeHandlerTest.java | 165 ++++++++++++++++++ .../tech/powerjob/server/test/DAGTest.java | 19 +- 19 files changed, 902 insertions(+), 119 deletions(-) create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/Evaluator.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluator.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/WorkflowNodeHandleService.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/ControlNodeHandler.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/TaskNodeHandler.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/WorkflowNodeHandlerMarker.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/DecisionNodeHandler.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java create mode 100644 powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/data/DataConstructUtil.java create mode 100644 powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluatorTest.java create mode 100644 powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/workflow/hanlder/DecisionNodeHandlerTest.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowNodeType.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowNodeType.java index 5428ebdd..808e523e 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowNodeType.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowNodeType.java @@ -15,9 +15,33 @@ public enum WorkflowNodeType { /** * 任务节点 */ - JOB(1); + JOB(1,false), + /** + * 判断节点 + */ + DECISION(2,true), + /** + * 内嵌工作流 + */ + NESTED_WORKFLOW(3,false), + ; private final int code; + /** + * 控制节点 + */ + private final boolean controlNode; + + public static WorkflowNodeType of(int code) { + for (WorkflowNodeType nodeType : values()) { + if (nodeType.code == code) { + return nodeType; + } + } + throw new IllegalArgumentException("unknown WorkflowNodeType of " + code); + } + + } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java index 61167661..64b53d5c 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java @@ -7,6 +7,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import tech.powerjob.common.enums.WorkflowNodeType; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -38,10 +39,10 @@ public class PEWorkflowDAG implements Serializable { @Data @Accessors(chain = true) @NoArgsConstructor - @AllArgsConstructor public static class Node implements Serializable { /** * node id + * * @since 20210128 */ private Long nodeId; @@ -49,6 +50,8 @@ public class PEWorkflowDAG implements Serializable { /** * note type + * + * @see WorkflowNodeType * @since 20210316 */ private Integer nodeType; @@ -61,23 +64,41 @@ public class PEWorkflowDAG implements Serializable { */ private String nodeName; - @JsonSerialize(using= ToStringSerializer.class) + @JsonSerialize(using = ToStringSerializer.class) private Long instanceId; - + /** + * for decision node, it is JavaScript code + */ private String nodeParams; private Integer status; - + /** + * for decision node, it only be can "true" or "false" + */ private String result; /** * instanceId will be null if disable . */ private Boolean enable; + /** + * mark node which disable by control node. + */ + private Boolean disableByControlNode; private Boolean skipWhenFailed; + private String startTime; + + private String finishedTime; + public Node(Long nodeId) { this.nodeId = nodeId; + this.nodeType = WorkflowNodeType.JOB.getCode(); + } + + public Node(Long nodeId, Integer nodeType) { + this.nodeId = nodeId; + this.nodeType = nodeType; } } @@ -86,10 +107,29 @@ public class PEWorkflowDAG implements Serializable { */ @Data @NoArgsConstructor - @AllArgsConstructor public static class Edge implements Serializable { + private Long from; + private Long to; + /** + * property,support for complex flow control + * for decision node , it can be "true" or "false" + */ + private String property; + + private Boolean enable; + + public Edge(long from, long to) { + this.from = from; + this.to = to; + } + + public Edge(long from, long to, String property) { + this.from = from; + this.to = to; + this.property = property; + } } public PEWorkflowDAG(@Nonnull List nodes, @Nullable List edges) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/Evaluator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/Evaluator.java new file mode 100644 index 00000000..cf60b4f4 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/Evaluator.java @@ -0,0 +1,18 @@ +package tech.powerjob.server.core.evaluator; + + +/** + * @author Echo009 + * @since 2021/12/10 + */ +public interface Evaluator { + /** + * 使用给定输入计算表达式 + * + * @param expression 可执行的表达式 + * @param input 输入 + * @return 计算结果 + */ + Object evaluate(String expression, Object input); + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluator.java new file mode 100644 index 00000000..5033121b --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluator.java @@ -0,0 +1,30 @@ +package tech.powerjob.server.core.evaluator; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.script.Bindings; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; + +/** + * @author Echo009 + * @since 2021/12/10 + */ +@Slf4j +@Component +public class JavaScriptEvaluator implements Evaluator { + + private static final ScriptEngine ENGINE = new ScriptEngineManager().getEngineByName("nashorn"); + + + @Override + @SneakyThrows + public Object evaluate(String expression, Object input) { + Bindings bindings = ENGINE.createBindings(); + bindings.put("context", input); + return ENGINE.eval(expression, bindings); + } + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 275ec1df..bd506b5d 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -78,7 +78,7 @@ public class InstanceService { * @param expectTriggerTime 预期执行时间 * @return 任务实例ID */ - public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) { + public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) { Long instanceId = idGenerateService.allocate(); Date now = new Date(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/WorkflowNodeHandleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/WorkflowNodeHandleService.java new file mode 100644 index 00000000..cc3e5824 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/WorkflowNodeHandleService.java @@ -0,0 +1,101 @@ +package tech.powerjob.server.core.service; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler; +import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; +import tech.powerjob.server.core.workflow.hanlder.WorkflowNodeHandlerMarker; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; +import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; + +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * @author Echo009 + * @since 2021/12/9 + */ +@Slf4j +@Service +public class WorkflowNodeHandleService { + + private final Map controlNodeHandlerContainer; + + private final Map taskNodeHandlerContainer; + + private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + + public WorkflowNodeHandleService(List controlNodeHandlerList, List taskNodeHandlerList, WorkflowInstanceInfoRepository workflowInstanceInfoRepository) { + // init + controlNodeHandlerContainer = new EnumMap<>(WorkflowNodeType.class); + taskNodeHandlerContainer = new EnumMap<>(WorkflowNodeType.class); + controlNodeHandlerList.forEach(controlNodeHandler -> controlNodeHandlerContainer.put(controlNodeHandler.matchingType(), controlNodeHandler)); + taskNodeHandlerList.forEach(taskNodeHandler -> taskNodeHandlerContainer.put(taskNodeHandler.matchingType(), taskNodeHandler)); + // + this.workflowInstanceInfoRepository = workflowInstanceInfoRepository; + } + + /** + * 处理任务节点 + * 注意,上层调用方必须保证这里的 taskNodeList 不能为空 + */ + public void handleTaskNodes(List taskNodeList, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + + // 创建任务实例 + taskNodeList.forEach(taskNode -> { + // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示 + TaskNodeHandler taskNodeHandler = (TaskNodeHandler) findMatchingHandler(taskNode); + taskNodeHandler.createTaskInstance(taskNode, dag, wfInstanceInfo); + log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={})", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), taskNode.getNodeId(), taskNode.getJobId()); + }); + // 持久化工作流实例信息 + wfInstanceInfo.setDag(JSON.toJSONString(dag)); + workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); + // 启动 + taskNodeList.forEach(taskNode -> { + TaskNodeHandler taskNodeHandler = (TaskNodeHandler) findMatchingHandler(taskNode); + taskNodeHandler.startTaskInstance(taskNode); + }); + + + } + + /** + * 处理控制节点 + * 注意,上层调用方必须保证这里的 controlNodeList 不能为空 + */ + public void handleControlNodes(List controlNodeList, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + for (PEWorkflowDAG.Node node : controlNodeList) { + handleControlNode(node, dag, wfInstanceInfo); + } + } + + public void handleControlNode(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + ControlNodeHandler controlNodeHandler = (ControlNodeHandler) findMatchingHandler(node); + node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis())); + controlNodeHandler.handle(node, dag, wfInstanceInfo); + node.setFinishedTime(CommonUtils.formatTime(System.currentTimeMillis())); + } + + + private WorkflowNodeHandlerMarker findMatchingHandler(PEWorkflowDAG.Node node) { + WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType()); + WorkflowNodeHandlerMarker res; + if (!nodeType.isControlNode()) { + res = taskNodeHandlerContainer.get(nodeType); + } else { + res = controlNodeHandlerContainer.get(nodeType); + } + if (res == null) { + // impossible + throw new UnsupportedOperationException("unsupported node type : " + nodeType); + } + return res; + } + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index a1e6aedb..e9fa802a 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -22,6 +22,7 @@ import tech.powerjob.server.core.DispatchService; import tech.powerjob.server.core.instance.InstanceService; import tech.powerjob.server.core.lock.UseSegmentLock; import tech.powerjob.server.core.service.UserService; +import tech.powerjob.server.core.service.WorkflowNodeHandleService; import tech.powerjob.server.core.uid.IdGenerateService; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.extension.defaultimpl.alram.AlarmCenter; @@ -34,6 +35,7 @@ import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoReposi import javax.annotation.Resource; import java.util.*; +import java.util.stream.Collectors; import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNotAllowSkipWhenFailed; @@ -67,6 +69,8 @@ public class WorkflowInstanceManager { private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Resource private WorkflowNodeInfoRepository workflowNodeInfoRepository; + @Resource + private WorkflowNodeHandleService workflowNodeHandleService; /** * 创建工作流任务实例 @@ -127,14 +131,14 @@ public class WorkflowInstanceManager { */ private void initNodeInfo(PEWorkflowDAG dag) { for (PEWorkflowDAG.Node node : dag.getNodes()) { - WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE)); + WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE)); // 任务节点,需初始化 是否启用、是否允许失败跳过、节点参数 等信息 if (workflowNodeInfo.getType() == null || workflowNodeInfo.getType() == WorkflowNodeType.JOB.getCode()) { // 任务节点缺失任务信息 if (workflowNodeInfo.getJobId() == null) { throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE); } - JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB)); + JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB)); node.setNodeType(WorkflowNodeType.JOB.getCode()); // 初始化任务相关信息 @@ -216,46 +220,40 @@ public class WorkflowInstanceManager { return; } } - try { // 从实例中读取工作流信息 PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class); // 根节点有可能被 disable List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); - // 创建所有的根任务 - readyNodes.forEach(readyNode -> { - // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示 - // instanceParam 传递的是工作流实例的 wfContext - Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); - readyNode.setInstanceId(instanceId); - readyNode.setStatus(InstanceStatus.RUNNING.getV()); - - log.info("[Workflow-{}|{}] create readyNode instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), instanceId); - }); - - // 持久化 - wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); - wfInstanceInfo.setDag(JSON.toJSONString(dag)); + // 先处理其中的控制节点 + List controlNodes = findControlNodes(readyNodes); + while (!controlNodes.isEmpty()) { + workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstanceInfo); + readyNodes = WorkflowDAGUtils.listReadyNodes(dag); + controlNodes = findControlNodes(readyNodes); + } if (readyNodes.isEmpty()) { // 没有就绪的节点(所有节点都被禁用) wfInstanceInfo.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); wfInstanceInfo.setResult(SystemInstanceResult.NO_ENABLED_NODES); wfInstanceInfo.setFinishedTime(System.currentTimeMillis()); + wfInstanceInfo.setDag(JSON.toJSONString(dag)); log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo); workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); return; } - workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); + // 需要更新工作流实例状态 + wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); + // 处理任务节点 + workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstanceInfo); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); - - // 真正开始执行根任务 - readyNodes.forEach(this::runInstance); } catch (Exception e) { log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo); } } + /** * 下一步(当工作流的某个任务完成时调用该方法) * ******************************************** @@ -313,14 +311,14 @@ public class WorkflowInstanceManager { wfInstance.setGmtModified(new Date()); wfInstance.setDag(JSON.toJSONString(dag)); - // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图 + // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的 DAG 图 if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { workflowInstanceInfoRepository.saveAndFlush(wfInstance); log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus()); return; } - // 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败 + // 任务失败 && 不允许失败跳过,DAG 流程被打断,整体失败 if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) { log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); @@ -329,55 +327,53 @@ public class WorkflowInstanceManager { // 子任务被手动停止 if (status == InstanceStatus.STOPPED) { - wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); - wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_STOPPED); - wfInstance.setFinishedTime(System.currentTimeMillis()); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - + updateWorkflowInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED); log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId); return; } // 注意:这里会直接跳过 disable 的节点 List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); - // 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态 - wfInstance.setDag(JSON.toJSONString(dag)); // 如果没有就绪的节点,需要再次判断是否已经全部完成 if (readyNodes.isEmpty() && isFinish(dag)) { allFinished = true; } // 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了) if (allFinished) { - wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); + // 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态 + wfInstance.setDag(JSON.toJSONString(dag)); // 最终任务的结果作为整个 workflow 的结果 - wfInstance.setResult(result); - wfInstance.setFinishedTime(System.currentTimeMillis()); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - + updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); return; } - - for (PEWorkflowDAG.Node readyNode : readyNodes) { - // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 - // instanceParam 传递的是工作流实例的 wfContext - Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); - readyNode.setInstanceId(newInstanceId); - readyNode.setStatus(InstanceStatus.RUNNING.getV()); - log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId); + // 先处理其中的控制节点 + List controlNodes = findControlNodes(readyNodes); + while (!controlNodes.isEmpty()) { + workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstance); + readyNodes = WorkflowDAGUtils.listReadyNodes(dag); + controlNodes = findControlNodes(readyNodes); } - // 这里也得更新 DAG 信息 - wfInstance.setDag(JSON.toJSONString(dag)); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - // 持久化结束后,开始调度执行所有的任务 - readyNodes.forEach(this::runInstance); - + // 再次判断是否已完成 (允许控制节点出现在末尾) + if (readyNodes.isEmpty()) { + if (isFinish(dag)) { + wfInstance.setDag(JSON.toJSONString(dag)); + updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); + log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); + return; + } + // 没有就绪的节点 但 还没执行完成,仅更新 DAG + wfInstance.setDag(JSON.toJSONString(dag)); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + return; + } + // 处理任务节点 + workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstance); } catch (Exception e) { onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); } } - /** * 更新工作流上下文 * fix : 得和其他操作工作流实例的方法用同一把锁才行,不然有并发问题,会导致节点状态被覆盖 @@ -412,21 +408,21 @@ public class WorkflowInstanceManager { } - /** - * 运行任务实例 - * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 - * - * @param node 节点信息 - */ - private void runInstance(PEWorkflowDAG.Node node) { - - JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); - // 洗去时间表达式类型 - jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); - dispatchService.dispatch(jobInfo, node.getInstanceId()); + private void updateWorkflowInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) { + wfInstance.setStatus(workflowInstanceStatus.getV()); + wfInstance.setResult(result); + wfInstance.setFinishedTime(System.currentTimeMillis()); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); } + private List findControlNodes(List readyNodes) { + return readyNodes.stream().filter(node -> { + WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType()); + return nodeType.isControlNode(); + }).collect(Collectors.toList()); + } + private boolean isFinish(PEWorkflowDAG dag) { for (PEWorkflowDAG.Node node : dag.getNodes()) { if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAG.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAG.java index 87612b5f..091be0f4 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAG.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAG.java @@ -1,19 +1,23 @@ package tech.powerjob.server.core.workflow.algorithm; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.*; +import tech.powerjob.common.model.PEWorkflowDAG; import java.util.List; +import java.util.Map; /** * DAG 工作流对象 - * 使用引用,易于计算(不再参与主运算,起辅助作用) + * 节点中均记录了上游以及下游的连接关系(无法使用 JSON 来序列化以及反序列化) * * @author tjq + * @author Echo009 * @since 2020/5/26 */ @Data +@ToString(exclude = {"nodeMap"}) @NoArgsConstructor @AllArgsConstructor public class WorkflowDAG { @@ -23,22 +27,31 @@ public class WorkflowDAG { */ private List roots; - @Data + private Map nodeMap; + + public Node getNode(Long nodeId) { + if (nodeMap == null) { + return null; + } + return nodeMap.get(nodeId); + } + + @Getter + @Setter + @EqualsAndHashCode(exclude = {"dependencies", "dependenceEdgeMap", "successorEdgeMap", "holder","successors"}) + @ToString(exclude = {"dependencies", "dependenceEdgeMap", "successorEdgeMap", "holder"}) @NoArgsConstructor public static final class Node { - public Node(List successors, Long nodeId, Long jobId, String jobName, int status) { - this.successors = successors; - this.nodeId = nodeId; - this.jobId = jobId; - this.jobName = jobName; - this.status = status; + public Node(PEWorkflowDAG.Node node) { + this.nodeId = node.getNodeId(); + this.holder = node; + this.dependencies = Lists.newLinkedList(); + this.dependenceEdgeMap = Maps.newHashMap(); + this.successors = Lists.newLinkedList(); + this.successorEdgeMap = Maps.newHashMap(); } - /** - * 后继者,子节点 - */ - private List successors; /** * node id * @@ -46,24 +59,23 @@ public class WorkflowDAG { */ private Long nodeId; - private Long jobId; - - private String jobName; + private PEWorkflowDAG.Node holder; /** - * 运行时信息 + * 依赖的上游节点 */ - private Long instanceId; + private List dependencies; /** - * 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED + * 连接依赖节点的边 */ - private int status; - - private String result; + private Map dependenceEdgeMap; /** - * instanceId will be null if disable . + * 后继者,子节点 */ - private Boolean enable; + private List successors; + /** + * 连接后继节点的边 + */ + private Map successorEdgeMap; - private Boolean skipWhenFailed; } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java index e685ea37..4bbe219f 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java @@ -1,11 +1,11 @@ package tech.powerjob.server.core.workflow.algorithm; +import com.google.common.collect.*; +import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.serialize.JsonUtils; -import com.google.common.collect.*; import java.util.*; @@ -60,7 +60,7 @@ public class WorkflowDAGUtils { WorkflowDAG dag = convert(peWorkflowDAG); // 检查所有顶点的路径 for (WorkflowDAG.Node root : dag.getRoots()) { - if (invalidPath(root, Sets.newHashSet(),traversalNodeIds)) { + if (invalidPath(root, Sets.newHashSet(), traversalNodeIds)) { return false; } } @@ -195,6 +195,56 @@ public class WorkflowDAGUtils { // 默认不允许跳过 return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed(); } + + /** + * 处理被 disable 掉的边 + * 1. 将仅能通过被 disable 掉的边可达的节点标记为 disable(disableByControlNode),将状态更新为已取消 + * 2. 将这些被 disable 掉的节点的出口边都标记为 disable + * 3. 递归调用自身,继续处理被 disable 的边 + */ + @SuppressWarnings("squid:S3776") + public static void handleDisableEdges(List disableEdges, WorkflowDAG dag) { + if (disableEdges.isEmpty()) { + return; + } + List disableNodes = Lists.newArrayList(); + // 处理边上的节点,如果该节点仅能通过被 disable 掉的边可达,那么将该节点标记为 disable ,disableByControlNode ,并且将状态更新为 已取消 + for (PEWorkflowDAG.Edge disableEdge : disableEdges) { + WorkflowDAG.Node toNode = dag.getNode(disableEdge.getTo()); + // 判断是否仅能通过被 disable 掉的边可达 + Collection dependenceEdges = toNode.getDependenceEdgeMap().values(); + boolean shouldBeDisable = true; + for (PEWorkflowDAG.Edge dependenceEdge : dependenceEdges) { + if (dependenceEdge.getEnable() == null || dependenceEdge.getEnable()) { + shouldBeDisable = false; + break; + } + } + if (shouldBeDisable) { + // disable + PEWorkflowDAG.Node node = toNode.getHolder(); + node.setEnable(false) + .setDisableByControlNode(true) + .setStatus(InstanceStatus.CANCELED.getV()); + disableNodes.add(node); + } + } + if (!disableNodes.isEmpty()) { + // 被 disable 掉的节点的出口边都会被标记为 disable + List targetEdges = Lists.newArrayList(); + for (PEWorkflowDAG.Node disableNode : disableNodes) { + WorkflowDAG.Node node = dag.getNode(disableNode.getNodeId()); + Collection edges = node.getSuccessorEdgeMap().values(); + for (PEWorkflowDAG.Edge edge : edges) { + edge.setEnable(false); + targetEdges.add(edge); + } + } + // 广度优先 继续处理被 disable 掉的边 + handleDisableEdges(targetEdges, dag); + } + } + /** * 将点线表示法的DAG图转化为引用表达法的DAG图 * @@ -212,9 +262,8 @@ public class WorkflowDAGUtils { // 创建节点 peWorkflowDAG.getNodes().forEach(node -> { Long nodeId = node.getNodeId(); - WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getNodeName(), InstanceStatus.WAITING_DISPATCH.getV()); + WorkflowDAG.Node n = new WorkflowDAG.Node(node); id2Node.put(nodeId, n); - // 初始阶段,每一个点都设为顶点 rootIds.add(nodeId); }); @@ -229,7 +278,9 @@ public class WorkflowDAGUtils { } from.getSuccessors().add(to); - + from.getSuccessorEdgeMap().put(to, edge); + to.getDependencies().add(from); + to.getDependenceEdgeMap().put(from, edge); // 被连接的点不可能成为 root,移除 rootIds.remove(to.getNodeId()); }); @@ -241,7 +292,7 @@ public class WorkflowDAGUtils { List roots = Lists.newLinkedList(); rootIds.forEach(id -> roots.add(id2Node.get(id))); - return new WorkflowDAG(roots); + return new WorkflowDAG(roots, id2Node); } @@ -257,7 +308,7 @@ public class WorkflowDAGUtils { } ids.add(root.getNodeId()); for (WorkflowDAG.Node node : root.getSuccessors()) { - if (invalidPath(node, Sets.newHashSet(ids),nodeIdContainer)) { + if (invalidPath(node, Sets.newHashSet(ids), nodeIdContainer)) { return true; } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/ControlNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/ControlNodeHandler.java new file mode 100644 index 00000000..1f03b9eb --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/ControlNodeHandler.java @@ -0,0 +1,22 @@ +package tech.powerjob.server.core.workflow.hanlder; + +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; + +/** + * @author Echo009 + * @since 2021/12/9 + */ +public interface ControlNodeHandler extends WorkflowNodeHandlerMarker { + + /** + * 处理控制节点 + * + * @param node 需要被处理的目标节点 + * @param dag 节点所属 DAG + * @param wfInstanceInfo 节点所属工作流实例 + */ + void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo); + + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/TaskNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/TaskNodeHandler.java new file mode 100644 index 00000000..fba914a9 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/TaskNodeHandler.java @@ -0,0 +1,29 @@ +package tech.powerjob.server.core.workflow.hanlder; + +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; + +/** + * @author Echo009 + * @since 2021/12/9 + */ +public interface TaskNodeHandler extends WorkflowNodeHandlerMarker { + + /** + * 创建任务实例 + * + * @param node 目标节点 + * @param dag DAG + * @param wfInstanceInfo 工作流实例 + */ + void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo); + + /** + * 执行任务实例 + * + * @param node 目标节点 + */ + void startTaskInstance(PEWorkflowDAG.Node node); + + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/WorkflowNodeHandlerMarker.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/WorkflowNodeHandlerMarker.java new file mode 100644 index 00000000..bae4e4d1 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/WorkflowNodeHandlerMarker.java @@ -0,0 +1,20 @@ +package tech.powerjob.server.core.workflow.hanlder; + +import tech.powerjob.common.enums.WorkflowNodeType; + +/** + * @author Echo009 + * @since 2021/12/9 + */ +public interface WorkflowNodeHandlerMarker { + + + /** + * 返回能够处理的节点类型 + * @return matching node type + */ + WorkflowNodeType matchingType(); + + + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/DecisionNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/DecisionNodeHandler.java new file mode 100644 index 00000000..fe4c7fda --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/DecisionNodeHandler.java @@ -0,0 +1,98 @@ +package tech.powerjob.server.core.workflow.hanlder.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.server.core.evaluator.JavaScriptEvaluator; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; +import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; + +import java.util.*; + +/** + * @author Echo009 + * @since 2021/12/9 + */ +@Slf4j +@Component +public class DecisionNodeHandler implements ControlNodeHandler { + + private final JavaScriptEvaluator javaScriptEvaluator = new JavaScriptEvaluator(); + + /** + * 处理判断节点 + * 1. 执行脚本 + * 2. 根据返回值 disable 掉相应的边以及节点 + */ + @Override + public void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + String script = node.getNodeParams(); + if (StringUtils.isBlank(script)) { + log.error("[Workflow-{}|{}]decision node's param is blank! nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId()); + throw new PowerJobException("decision node's param is blank!"); + } + // wfContext must be a map + HashMap wfContext = JSON.parseObject(wfInstanceInfo.getWfContext(), new TypeReference>() { + }); + Object result; + try { + result = javaScriptEvaluator.evaluate(script, wfContext); + } catch (Exception e) { + log.error("[Workflow-{}|{}]failed to evaluate decision node,nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), e); + throw new PowerJobException("can't evaluate decision node!"); + } + boolean finalRes; + if (result instanceof Boolean) { + finalRes = ((Boolean) result); + } else if (result instanceof Number) { + finalRes = ((Number) result).doubleValue() > 0; + } else { + log.error("[Workflow-{}|{}]decision node's return value is illegal,nodeId:{},result:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), JsonUtils.toJSONString(result)); + throw new PowerJobException("decision node's return value is illegal!"); + } + handleDag(finalRes, node, dag); + } + + + private void handleDag(boolean res, PEWorkflowDAG.Node node, PEWorkflowDAG peDag) { + // 更新判断节点的状态为成功 + node.setResult(String.valueOf(res)); + node.setStatus(InstanceStatus.SUCCEED.getV()); + WorkflowDAG dag = WorkflowDAGUtils.convert(peDag); + // 根据节点的计算结果,将相应的边 disable + WorkflowDAG.Node targetNode = dag.getNode(node.getNodeId()); + Collection edges = targetNode.getSuccessorEdgeMap().values(); + if (edges.isEmpty()) { + return; + } + List disableEdges = new ArrayList<>(edges.size()); + for (PEWorkflowDAG.Edge edge : edges) { + // 这里一定不会出现异常 + boolean property = Boolean.parseBoolean(edge.getProperty()); + if (res != property) { + // disable + edge.setEnable(false); + disableEdges.add(edge); + } + } + WorkflowDAGUtils.handleDisableEdges(disableEdges,dag); + } + + + + + + @Override + public WorkflowNodeType matchingType() { + return WorkflowNodeType.DECISION; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java new file mode 100644 index 00000000..531123e9 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java @@ -0,0 +1,56 @@ +package tech.powerjob.server.core.workflow.hanlder.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.core.DispatchService; +import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; + +import javax.annotation.Resource; + +/** + * @author Echo009 + * @since 2021/12/9 + */ +@Slf4j +@Component +public class JobNodeHandler implements TaskNodeHandler { + + @Resource + private InstanceService instanceService; + + @Resource + private JobInfoRepository jobInfoRepository; + + @Resource + private DispatchService dispatchService; + + @Override + public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + // instanceParam 传递的是工作流实例的 wfContext + Long instanceId = instanceService.create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()); + node.setInstanceId(instanceId); + node.setStatus(InstanceStatus.RUNNING.getV()); + log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId); + } + + @Override + public void startTaskInstance(PEWorkflowDAG.Node node) { + JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); + // 洗去时间表达式类型 + jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); + dispatchService.dispatch(jobInfo, node.getInstanceId()); + } + + @Override + public WorkflowNodeType matchingType() { + return WorkflowNodeType.JOB; + } +} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java index f6c15270..0e51a330 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java @@ -40,6 +40,7 @@ public class WorkflowNodeInfoDO { private Integer type; /** * 任务 ID + * 对于嵌套工作流类型的节点而言,这里存储是工作流 ID */ private Long jobId; /** diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/data/DataConstructUtil.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/data/DataConstructUtil.java new file mode 100644 index 00000000..5871a9a0 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/data/DataConstructUtil.java @@ -0,0 +1,32 @@ +package tech.powerjob.server.core.data; + +import com.google.common.collect.Lists; +import tech.powerjob.common.model.PEWorkflowDAG; + +import java.util.List; + +/** + * @author Echo009 + * @since 2021/12/10 + */ +public class DataConstructUtil { + + public static void addNodes(PEWorkflowDAG dag, PEWorkflowDAG.Node... nodes) { + for (PEWorkflowDAG.Node node : nodes) { + dag.getNodes().add(node); + } + } + + public static void addEdges(PEWorkflowDAG dag, PEWorkflowDAG.Edge... edges) { + for (PEWorkflowDAG.Edge edge : edges) { + dag.getEdges().add(edge); + } + } + + public static PEWorkflowDAG constructEmptyDAG() { + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + return new PEWorkflowDAG(nodes, edges); + } + +} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluatorTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluatorTest.java new file mode 100644 index 00000000..e5fb58be --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/evaluator/JavaScriptEvaluatorTest.java @@ -0,0 +1,99 @@ +package tech.powerjob.server.core.evaluator; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.junit.Assert; +import org.junit.Test; +import tech.powerjob.common.serialize.JsonUtils; + +import java.util.HashMap; + +/** + * @author Echo009 + * @since 2021/12/10 + */ +public class JavaScriptEvaluatorTest { + + private final JavaScriptEvaluator javaScriptEvaluator = new JavaScriptEvaluator(); + + private final HashMap SIMPLE_CONTEXT = new HashMap<>(); + + private final HashMap COMPLEX_CONTEXT = new HashMap<>(); + + { + // simple context + // {"k1":"1","k2":"\"2\"","k3":"false","k4":"1.1"} + SIMPLE_CONTEXT.put("k1", JsonUtils.toJSONString(1)); + SIMPLE_CONTEXT.put("k2", JsonUtils.toJSONString("2")); + SIMPLE_CONTEXT.put("k3", JsonUtils.toJSONString(false)); + SIMPLE_CONTEXT.put("k4", JsonUtils.toJSONString(1.1d)); + + // complex context + // {"array":"[1,2,3,4,5]","obj":"{\"id\":\"e3\",\"value\":3,\"sub\":{\"id\":\"e2\",\"value\":2,\"sub\":{\"id\":\"e1\",\"value\":1,\"sub\":null}}}","map":"{\"e1\":{\"id\":\"e1\",\"value\":1,\"sub\":null}}"} + COMPLEX_CONTEXT.put("array", JsonUtils.toJSONString(new int[]{1, 2, 3, 4, 5})); + Element e1 = new Element("e1",1,null); + Element e2 = new Element("e2",2,e1); + Element e3 = new Element("e3",3,e2); + COMPLEX_CONTEXT.put("obj", JsonUtils.toJSONString(e3)); + HashMap map = new HashMap<>(); + map.put("e1",e1); + COMPLEX_CONTEXT.put("map",JsonUtils.toJSONString(map)); + + } + + @Test + public void testSimpleEval1() { + Object res = javaScriptEvaluator.evaluate("var x = false; x;", null); + Assert.assertEquals(false, res); + } + + @Test + public void testSimpleEval2() { + Object res = javaScriptEvaluator.evaluate("var person = {name:'echo',tag:'y'}; person.name;", null); + Assert.assertEquals("echo", res); + } + + + @Test + public void testSimpleEval3() { + // inject simple context + Object res = javaScriptEvaluator.evaluate("var res = context.k3; res;", SIMPLE_CONTEXT); + Boolean s = JsonUtils.parseObjectUnsafe(res.toString(), Boolean.class); + Assert.assertEquals(false, s); + } + + @Test + public void testSimpleEval4() { + Object res = javaScriptEvaluator.evaluate("var res = JSON.parse(context.k3); res == false;", SIMPLE_CONTEXT); + Assert.assertEquals(true, res); + } + + @Test + public void testComplexEval1() { + // array + Object res = javaScriptEvaluator.evaluate("var res = JSON.parse(context.array); res[0] == 1;", COMPLEX_CONTEXT); + Assert.assertEquals(true, res); + // map + res = javaScriptEvaluator.evaluate("var map = JSON.parse(context.map); var e1 = map.e1; e1.value ",COMPLEX_CONTEXT); + Assert.assertEquals(1,res); + // object + res = javaScriptEvaluator.evaluate("var e3 = JSON.parse(context.obj); var e1 = e3.sub.sub; e1.value ",COMPLEX_CONTEXT); + Assert.assertEquals(1,res); + } + + + @Data + @AllArgsConstructor + @NoArgsConstructor + static class Element { + + private String id; + + private Integer value; + + private Element sub; + + } + +} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/workflow/hanlder/DecisionNodeHandlerTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/workflow/hanlder/DecisionNodeHandlerTest.java new file mode 100644 index 00000000..22266a70 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/workflow/hanlder/DecisionNodeHandlerTest.java @@ -0,0 +1,165 @@ +package tech.powerjob.server.core.workflow.hanlder; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.core.workflow.hanlder.impl.DecisionNodeHandler; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; + + +import static tech.powerjob.server.core.data.DataConstructUtil.*; + +/** + * @author Echo009 + * @since 2021/12/9 + *

+ * 如有变动,请同步变更文档 + * https://www.yuque.com/powerjob/dev/bgw03h/edit?toc_node_uuid=V9igz9SZ30lF59bX + */ +class DecisionNodeHandlerTest { + + private final DecisionNodeHandler decisionNodeHandler = new DecisionNodeHandler(); + + + @Test + void testCase1() { + + PEWorkflowDAG peWorkflowDAG = constructEmptyDAG(); + PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node1.setNodeParams("true;"); + PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L); + PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L); + PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L); + addNodes(peWorkflowDAG, node1, node2, node3, node4); + + PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "false"); + PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "true"); + PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L); + addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4); + + decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO()); + Assertions.assertEquals(false, node2.getEnable()); + Assertions.assertEquals(true, node2.getDisableByControlNode()); + Assertions.assertEquals(false, node4.getEnable()); + Assertions.assertEquals(true, node4.getDisableByControlNode()); + // + Assertions.assertEquals(false, edge1_2.getEnable()); + Assertions.assertEquals(false, edge2_4.getEnable()); + + } + + + @Test + void testCase2() { + + PEWorkflowDAG peWorkflowDAG = constructEmptyDAG(); + PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node1.setNodeParams("true;"); + PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L); + PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L); + PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L); + PEWorkflowDAG.Node node5 = new PEWorkflowDAG.Node(5L); + addNodes(peWorkflowDAG, node1, node2, node3, node4, node5); + + PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "false"); + PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "true"); + PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L); + PEWorkflowDAG.Edge edge2_5 = new PEWorkflowDAG.Edge(2L, 5L); + PEWorkflowDAG.Edge edge3_5 = new PEWorkflowDAG.Edge(3L, 5L); + addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4, edge2_5, edge3_5); + + decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO()); + Assertions.assertEquals(false, node2.getEnable()); + Assertions.assertEquals(true, node2.getDisableByControlNode()); + Assertions.assertEquals(false, node4.getEnable()); + Assertions.assertEquals(true, node4.getDisableByControlNode()); + // + Assertions.assertEquals(false, edge1_2.getEnable()); + Assertions.assertEquals(false, edge2_4.getEnable()); + Assertions.assertEquals(false, edge2_5.getEnable()); + + } + + @Test + void testCase3() { + + PEWorkflowDAG peWorkflowDAG = constructEmptyDAG(); + PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node1.setNodeParams("true;"); + PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node2.setNodeParams("true;"); + PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L); + PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L); + PEWorkflowDAG.Node node5 = new PEWorkflowDAG.Node(5L); + addNodes(peWorkflowDAG, node1, node2, node3, node4, node5); + + PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "true"); + PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "false"); + PEWorkflowDAG.Edge edge2_5 = new PEWorkflowDAG.Edge(2L, 5L, "false"); + PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L, "true"); + PEWorkflowDAG.Edge edge3_5 = new PEWorkflowDAG.Edge(3L, 5L); + addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4, edge2_5, edge3_5); + // 处理第一个判断节点后 + decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO()); + Assertions.assertEquals(false, node3.getEnable()); + Assertions.assertEquals(true, node3.getDisableByControlNode()); + // + Assertions.assertEquals(false, edge1_3.getEnable()); + Assertions.assertEquals(false, edge3_5.getEnable()); + Assertions.assertNull(edge2_5.getEnable()); + // 节点 5 还是初始状态 + Assertions.assertNull(node5.getEnable()); + // 处理第二个判断节点 + decisionNodeHandler.handle(node2, peWorkflowDAG, new WorkflowInstanceInfoDO()); + // 节点 5 被 disable + Assertions.assertFalse(node5.getEnable()); + Assertions.assertFalse(edge2_5.getEnable()); + } + + + @Test + void testCase4() { + + PEWorkflowDAG peWorkflowDAG = constructEmptyDAG(); + PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node1.setNodeParams("true;"); + PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node2.setNodeParams("true;"); + PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L); + PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L); + PEWorkflowDAG.Node node5 = new PEWorkflowDAG.Node(5L); + addNodes(peWorkflowDAG, node1, node2, node3, node4, node5); + + PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "true"); + PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "false"); + PEWorkflowDAG.Edge edge2_5 = new PEWorkflowDAG.Edge(2L, 5L, "true"); + PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L, "false"); + PEWorkflowDAG.Edge edge3_5 = new PEWorkflowDAG.Edge(3L, 5L); + addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4, edge2_5, edge3_5); + // 处理第一个判断节点后 + decisionNodeHandler.handle(node1, peWorkflowDAG, new WorkflowInstanceInfoDO()); + Assertions.assertEquals(false, node3.getEnable()); + Assertions.assertEquals(true, node3.getDisableByControlNode()); + // + Assertions.assertEquals(false, edge1_3.getEnable()); + Assertions.assertEquals(false, edge3_5.getEnable()); + Assertions.assertNull(edge2_5.getEnable()); + // 节点 5 还是初始状态 + Assertions.assertNull(node5.getEnable()); + // 处理第二个判断节点 + decisionNodeHandler.handle(node2, peWorkflowDAG, new WorkflowInstanceInfoDO()); + // 节点 5 还是初始状态 + Assertions.assertNull(node5.getEnable()); + Assertions.assertFalse(node4.getEnable()); + Assertions.assertTrue(node4.getDisableByControlNode()); + Assertions.assertFalse(edge2_4.getEnable()); + } + +} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java index 83852efa..3a8078f3 100644 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java @@ -1,10 +1,8 @@ package tech.powerjob.server.test; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.PEWorkflowDAG; -import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import com.google.common.collect.Lists; @@ -56,19 +54,11 @@ public class DAGTest { Assert.assertTrue(WorkflowDAGUtils.valid(validPEDAG)); WorkflowDAG wfDAG = WorkflowDAGUtils.convert(validPEDAG); - System.out.println("jackson"); - System.out.println(JsonUtils.toJSONString(wfDAG)); - // Jackson 不知道怎么序列化引用,只能放弃,使用 FastJSON 序列化引用,即 $ref - WorkflowDAG wfDAGByJackSon = JsonUtils.parseObject(JsonUtils.toJSONString(wfDAG), WorkflowDAG.class); - - System.out.println("fastJson"); - System.out.println(JSONObject.toJSONString(wfDAG)); - WorkflowDAG wfDAGByFastJSON = JSONObject.parseObject(JSONObject.toJSONString(wfDAG), WorkflowDAG.class); - - // 打断点看 reference 关系 - System.out.println(wfDAGByJackSon); - System.out.println(wfDAGByFastJSON); + Assert.assertEquals(1, wfDAG.getRoots().size()); + WorkflowDAG.Node node = wfDAG.getNode(3L); + Assert.assertEquals(1, (long) node.getDependencies().get(0).getNodeId()); + Assert.assertEquals(4, (long) node.getSuccessors().get(0).getNodeId()); } @Test @@ -151,7 +141,6 @@ public class DAGTest { } - /** * @author Echo009 * @since 2021/02/07