From 9194641c6fe07b3e40eec7bfc8c728eba7a0fff6 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Sat, 15 Jan 2022 15:52:59 +0800 Subject: [PATCH] fix: workflow node validator --- .../powerjob/client/test/TestWorkflow.java | 6 ++-- .../powerjob/common/model/PEWorkflowDAG.java | 9 ++---- .../request/http/SaveWorkflowNodeRequest.java | 9 +++--- .../core/service/NodeValidateService.java | 31 +++++++++++++------ .../core/validator/DecisionNodeValidator.java | 19 +++++++----- .../core/validator/JobNodeValidator.java | 9 ++++-- .../NestedWorkflowNodeValidator.java | 22 +++++++++++-- .../server/core/validator/NodeValidator.java | 12 +++++-- .../workflow/WorkflowInstanceManager.java | 28 ++++++++--------- .../server/core/workflow/WorkflowService.java | 25 +++------------ .../impl/NestedWorkflowNodeHandler.java | 8 ++--- .../core/validator/NodeValidatorTest.java | 8 ++++- 12 files changed, 108 insertions(+), 78 deletions(-) diff --git a/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java b/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java index 75d6cae2..8443d8a5 100644 --- a/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java +++ b/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java @@ -71,18 +71,18 @@ class TestWorkflow extends ClientInitializer { SaveWorkflowNodeRequest saveWorkflowNodeRequest1 = new SaveWorkflowNodeRequest(); saveWorkflowNodeRequest1.setJobId(1L); saveWorkflowNodeRequest1.setNodeName("DAG-Node-1"); - saveWorkflowNodeRequest1.setType(WorkflowNodeType.JOB); + saveWorkflowNodeRequest1.setType(WorkflowNodeType.JOB.getCode()); SaveWorkflowNodeRequest saveWorkflowNodeRequest2 = new SaveWorkflowNodeRequest(); saveWorkflowNodeRequest2.setJobId(1L); saveWorkflowNodeRequest2.setNodeName("DAG-Node-2"); - saveWorkflowNodeRequest2.setType(WorkflowNodeType.JOB); + saveWorkflowNodeRequest2.setType(WorkflowNodeType.JOB.getCode()); SaveWorkflowNodeRequest saveWorkflowNodeRequest3 = new SaveWorkflowNodeRequest(); saveWorkflowNodeRequest3.setJobId(1L); saveWorkflowNodeRequest3.setNodeName("DAG-Node-3"); - saveWorkflowNodeRequest3.setType(WorkflowNodeType.JOB); + saveWorkflowNodeRequest3.setType(WorkflowNodeType.JOB.getCode()); List nodeList = powerJobClient.saveWorkflowNode(Lists.newArrayList(saveWorkflowNodeRequest1,saveWorkflowNodeRequest2,saveWorkflowNodeRequest3)).getData(); diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java index a8a5ab88..8fd13536 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java @@ -55,14 +55,11 @@ public class PEWorkflowDAG implements Serializable { */ private Integer nodeType; /** - * job id - */ - private Long jobId; - /** - * workflow id,support for nested workflow + * job id or workflow id (if this Node type is a nested workflow) + * * @see WorkflowNodeType#NESTED_WORKFLOW */ - private Long wfId; + private Long jobId; /** * node name */ diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java index 986fdd83..506be4e8 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java @@ -1,8 +1,8 @@ package tech.powerjob.common.request.http; +import lombok.Data; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.utils.CommonUtils; -import lombok.Data; @@ -22,7 +22,7 @@ public class SaveWorkflowNodeRequest { /** * 节点类型(默认为任务节点) */ - private WorkflowNodeType type = WorkflowNodeType.JOB; + private Integer type; /** * 任务 ID */ @@ -44,10 +44,11 @@ public class SaveWorkflowNodeRequest { */ private Boolean skipWhenFailed = false; - public void valid(){ + public void valid() { CommonUtils.requireNonNull(this.appId, "appId can't be empty"); CommonUtils.requireNonNull(this.type, "type can't be empty"); - if (type == WorkflowNodeType.JOB) { + final WorkflowNodeType workflowNodeType = WorkflowNodeType.of(type); + if (workflowNodeType == WorkflowNodeType.JOB || workflowNodeType == WorkflowNodeType.NESTED_WORKFLOW) { CommonUtils.requireNonNull(this.jobId, "jobId can't be empty"); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java index 70ae40a4..c5074b27 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java @@ -3,9 +3,9 @@ package tech.powerjob.server.core.service; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import tech.powerjob.common.enums.WorkflowNodeType; -import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.core.validator.NodeValidator; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import java.util.EnumMap; import java.util.List; @@ -27,19 +27,30 @@ public class NodeValidateService { } - public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { - Integer nodeTypeCode = node.getNodeType(); - if (nodeTypeCode == null) { - // 前向兼容,默认为 任务节点 - nodeTypeCode = WorkflowNodeType.JOB.getCode(); - } - WorkflowNodeType nodeType = WorkflowNodeType.of(nodeTypeCode); - NodeValidator nodeValidator = nodeValidatorMap.get(nodeType); + public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) { + NodeValidator nodeValidator = getNodeValidator(node); if (nodeValidator == null) { // 默认不需要校验 return; } - nodeValidator.validate(node, dag); + nodeValidator.complexValidate(node, dag); } + public void simpleValidate(WorkflowNodeInfoDO node) { + NodeValidator nodeValidator = getNodeValidator(node); + if (nodeValidator == null) { + // 默认不需要校验 + return; + } + nodeValidator.simpleValidate(node); + } + + private NodeValidator getNodeValidator(WorkflowNodeInfoDO node) { + Integer nodeTypeCode = node.getType(); + if (nodeTypeCode == null) { + // 前向兼容,默认为 任务节点 + return nodeValidatorMap.get(WorkflowNodeType.JOB); + } + return nodeValidatorMap.get(WorkflowNodeType.of(nodeTypeCode)); + } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java index 1e8abfcc..ab560291 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java @@ -7,6 +7,7 @@ import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import java.util.Collection; @@ -20,14 +21,9 @@ public class DecisionNodeValidator implements NodeValidator { @Override - public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { - // 简单校验 - String nodeParams = node.getNodeParams(); - if (StringUtils.isBlank(nodeParams)) { - throw new PowerJobException("DecisionNode‘s param must be not null,node name : " + node.getNodeName()); - } + public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) { // 出度固定为 2 - WorkflowDAG.Node nodeWrapper = dag.getNode(node.getNodeId()); + WorkflowDAG.Node nodeWrapper = dag.getNode(node.getId()); Collection edges = nodeWrapper.getSuccessorEdgeMap().values(); if (edges.size() != 2) { throw new PowerJobException("DecisionNode‘s out-degree must be 2,node name : " + node.getNodeName()); @@ -52,6 +48,15 @@ public class DecisionNodeValidator implements NodeValidator { } + @Override + public void simpleValidate(WorkflowNodeInfoDO node) { + // 简单校验 + String nodeParams = node.getNodeParams(); + if (StringUtils.isBlank(nodeParams)) { + throw new PowerJobException("DecisionNode‘s param must be not null,node name : " + node.getNodeName()); + } + } + public static boolean isValidBooleanStr(String str) { return StringUtils.equalsIgnoreCase(str.trim(), "true") || StringUtils.equalsIgnoreCase(str.trim(), "false"); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java index cc056e1b..e95a34e5 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java @@ -4,10 +4,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import javax.annotation.Resource; @@ -24,7 +24,12 @@ public class JobNodeValidator implements NodeValidator { private JobInfoRepository jobInfoRepository; @Override - public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { + public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) { + // do nothing + } + + @Override + public void simpleValidate(WorkflowNodeInfoDO node) { // 判断对应的任务是否存在 JobInfoDO job = jobInfoRepository.findById(node.getJobId()) .orElseThrow(() -> new PowerJobException("Illegal job node,specified job is not exist,node name : " + node.getNodeName())); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java index d0608656..683f4fde 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java @@ -9,10 +9,13 @@ import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; +import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import javax.annotation.Resource; import java.util.Objects; +import java.util.Optional; /** * @author Echo009 @@ -24,12 +27,19 @@ public class NestedWorkflowNodeValidator implements NodeValidator { @Resource private WorkflowInfoRepository workflowInfoRepository; + @Resource + private WorkflowNodeInfoRepository workflowNodeInfoRepository; + + @Override + public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) { + // do nothing + } @Override - public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { + public void simpleValidate(WorkflowNodeInfoDO node) { // 判断对应工作流是否存在 - WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getWfId()) + WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getJobId()) .orElseThrow(() -> new PowerJobException("Illegal nested workflow node,specified workflow is not exist,node name : " + node.getNodeName())); if (workflowInfo.getStatus() == SwitchableStatus.DELETED.getV()) { throw new PowerJobException("Illegal nested workflow node,specified workflow has been deleted,node name : " + node.getNodeName()); @@ -37,7 +47,13 @@ public class NestedWorkflowNodeValidator implements NodeValidator { // 不允许多层嵌套,即 嵌套工作流节点引用的工作流中不能包含嵌套节点 PEWorkflowDAG peDag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class); for (PEWorkflowDAG.Node peDagNode : peDag.getNodes()) { - if (Objects.equals(peDagNode.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) { + // + final Optional nestWfNodeOp = workflowNodeInfoRepository.findById(peDagNode.getNodeId()); + if (!nestWfNodeOp.isPresent()) { + // 嵌套的工作流无效,缺失节点元数据 + throw new PowerJobException("Illegal nested workflow node,specified workflow is invalidate,node name : " + node.getNodeName()); + } + if (Objects.equals(nestWfNodeOp.get().getType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) { throw new PowerJobException("Illegal nested workflow node,specified workflow must be a simple workflow,node name : " + node.getNodeName()); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java index d35b0f82..3624c175 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java @@ -1,8 +1,8 @@ package tech.powerjob.server.core.validator; import tech.powerjob.common.enums.WorkflowNodeType; -import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; /** * @author Echo009 @@ -10,11 +10,17 @@ import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; */ public interface NodeValidator { /** - * 校验工作流节点是否合法 + * 校验工作流节点(校验拓扑关系等) * @param node 节点 * @param dag dag */ - void validate(PEWorkflowDAG.Node node, WorkflowDAG dag); + void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag); + + /** + * 校验工作流节点 + * @param node 节点 + */ + void simpleValidate(WorkflowNodeInfoDO node); /** * 匹配的节点类型 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index 33fa51e0..89845846 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -133,29 +133,29 @@ public class WorkflowInstanceManager { private void initNodeInfo(PEWorkflowDAG dag) { for (PEWorkflowDAG.Node node : dag.getNodes()) { WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE)); - // 任务节点,需初始化 是否启用、是否允许失败跳过、节点参数 等信息 - if (workflowNodeInfo.getType() == null || workflowNodeInfo.getType() == WorkflowNodeType.JOB.getCode()) { + if (workflowNodeInfo.getType() == null) { + // 前向兼容 + workflowNodeInfo.setType(WorkflowNodeType.JOB.getCode()); + } + // 填充基础信息 + node.setNodeType(workflowNodeInfo.getType()) + .setJobId(workflowNodeInfo.getJobId()) + .setNodeName(workflowNodeInfo.getNodeName()) + .setEnable(workflowNodeInfo.getEnable()) + .setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed()); + + // 任务节点,初始化节点参数时需要特殊处理 + if (node.getNodeType() == WorkflowNodeType.JOB.getCode()) { // 任务节点缺失任务信息 if (workflowNodeInfo.getJobId() == null) { throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE); } JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB)); - - node.setNodeType(WorkflowNodeType.JOB.getCode()); - // 初始化任务相关信息 - node.setJobId(workflowNodeInfo.getJobId()) - .setNodeName(workflowNodeInfo.getNodeName()) - .setEnable(workflowNodeInfo.getEnable()) - .setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed()); - if (!StringUtils.isBlank(workflowNodeInfo.getNodeParams())) { node.setNodeParams(workflowNodeInfo.getNodeParams()); } else { node.setNodeParams(jobInfo.getJobParams()); } - } else { - // 非任务节点 - node.setNodeType(workflowNodeInfo.getType()); } } } @@ -427,7 +427,7 @@ public class WorkflowInstanceManager { updateWorkflowContext(wfInstance.getParentWfInstanceId(),wfContext); } // 处理父工作流 - move(wfInstance.getWfInstanceId(),wfInstance.getParentWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus),result); + move(wfInstance.getParentWfInstanceId(), wfInstance.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), result); } // 报警 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index 530e0977..b619e4d5 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -8,9 +8,8 @@ import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; @@ -21,10 +20,8 @@ import tech.powerjob.server.common.utils.CronExpression; import tech.powerjob.server.core.service.NodeValidateService; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; @@ -53,8 +50,6 @@ public class WorkflowService { @Resource private WorkflowNodeInfoRepository workflowNodeInfoRepository; @Resource - private JobInfoRepository jobInfoRepository; - @Resource private NodeValidateService nodeValidateService; /** @@ -134,7 +129,7 @@ public class WorkflowService { if (!wfId.equals(nodeInfo.getWorkflowId())) { throw new PowerJobException("can't use another workflow's node"); } - nodeValidateService.validate(node,complexDag); + nodeValidateService.complexValidate(nodeInfo, complexDag); // 只保存节点的 ID 信息,清空其他信息 newNodes.add(new PEWorkflowDAG.Node(node.getNodeId())); nodeIdList.add(node.getNodeId()); @@ -313,20 +308,9 @@ public class WorkflowService { workflowNodeInfo = new WorkflowNodeInfoDO(); workflowNodeInfo.setGmtCreate(new Date()); } - - // valid job info - if (req.getType() == WorkflowNodeType.JOB) { - JobInfoDO jobInfoDO = jobInfoRepository.findById(req.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + req.getJobId())); - if (!jobInfoDO.getAppId().equals(appId)) { - throw new PowerJobException("Permission Denied! can't use other app's job!"); - } - if (StringUtils.isEmpty(workflowNodeInfo.getNodeName())) { - workflowNodeInfo.setNodeName(jobInfoDO.getJobName()); - } - } - BeanUtils.copyProperties(req, workflowNodeInfo); - workflowNodeInfo.setType(req.getType().getCode()); + workflowNodeInfo.setType(req.getType()); + nodeValidateService.simpleValidate(workflowNodeInfo); workflowNodeInfo.setGmtModified(new Date()); workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo); res.add(workflowNodeInfo); @@ -359,7 +343,6 @@ public class WorkflowService { if (nodeInfo != null) { node.setNodeType(nodeInfo.getType()) .setJobId(nodeInfo.getJobId()) - .setWfId(nodeInfo.getId()) .setEnable(nodeInfo.getEnable()) .setSkipWhenFailed(nodeInfo.getSkipWhenFailed()) .setNodeName(nodeInfo.getNodeName()) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java index 93afe9d9..de7d593e 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java @@ -39,13 +39,13 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler { @Override public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { // check - Long wfId = node.getWfId(); + Long wfId = node.getJobId(); WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); if (targetWf == null || targetWf.getStatus() == SwitchableStatus.DELETED.getV()) { if (targetWf == null) { - log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId()); + log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId()); } else { - log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId()); + log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId()); } throw new PowerJobException("invalid nested workflow node," + node.getNodeId()); } @@ -71,7 +71,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler { @Override public void startTaskInstance(PEWorkflowDAG.Node node) { - Long wfId = node.getWfId(); + Long wfId = node.getJobId(); WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); workflowInstanceManager.start(targetWf, node.getInstanceId()); } diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java index eb32a0af..0a330e1e 100644 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java @@ -2,11 +2,13 @@ package tech.powerjob.server.core.validator; import org.junit.Assert; import org.junit.jupiter.api.Test; +import org.springframework.beans.BeanUtils; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import static tech.powerjob.server.core.data.DataConstructUtil.*; @@ -35,7 +37,11 @@ class NodeValidatorTest { PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L); addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4); WorkflowDAG dag = WorkflowDAGUtils.convert(peWorkflowDAG); - Assert.assertThrows(PowerJobException.class, () -> decisionNodeValidator.validate(node1, dag)); + + final WorkflowNodeInfoDO workflowNodeInfo1 = new WorkflowNodeInfoDO(); + BeanUtils.copyProperties(node1, workflowNodeInfo1); + workflowNodeInfo1.setId(node1.getNodeId()); + Assert.assertThrows(PowerJobException.class, () -> decisionNodeValidator.complexValidate(workflowNodeInfo1, dag)); }