diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java new file mode 100644 index 00000000..70ae40a4 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/NodeValidateService.java @@ -0,0 +1,45 @@ +package tech.powerjob.server.core.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.core.validator.NodeValidator; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; + +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * @author Echo009 + * @since 2021/12/14 + */ +@Service +@Slf4j +public class NodeValidateService { + + private final Map nodeValidatorMap; + + public NodeValidateService(List nodeValidators) { + nodeValidatorMap = new EnumMap<>(WorkflowNodeType.class); + nodeValidators.forEach(e -> nodeValidatorMap.put(e.matchingType(), e)); + } + + + public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { + Integer nodeTypeCode = node.getNodeType(); + if (nodeTypeCode == null) { + // 前向兼容,默认为 任务节点 + nodeTypeCode = WorkflowNodeType.JOB.getCode(); + } + WorkflowNodeType nodeType = WorkflowNodeType.of(nodeTypeCode); + NodeValidator nodeValidator = nodeValidatorMap.get(nodeType); + if (nodeValidator == null) { + // 默认不需要校验 + return; + } + nodeValidator.validate(node, dag); + } + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java new file mode 100644 index 00000000..1e8abfcc --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/DecisionNodeValidator.java @@ -0,0 +1,65 @@ +package tech.powerjob.server.core.validator; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; + +import java.util.Collection; + +/** + * @author Echo009 + * @since 2021/12/14 + */ +@Component +@Slf4j +public class DecisionNodeValidator implements NodeValidator { + + + @Override + public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { + // 简单校验 + String nodeParams = node.getNodeParams(); + if (StringUtils.isBlank(nodeParams)) { + throw new PowerJobException("DecisionNode‘s param must be not null,node name : " + node.getNodeName()); + } + // 出度固定为 2 + WorkflowDAG.Node nodeWrapper = dag.getNode(node.getNodeId()); + Collection edges = nodeWrapper.getSuccessorEdgeMap().values(); + if (edges.size() != 2) { + throw new PowerJobException("DecisionNode‘s out-degree must be 2,node name : " + node.getNodeName()); + } + // 边的属性必须为 ture 或者 false + boolean containFalse = false; + boolean containTrue = false; + for (PEWorkflowDAG.Edge edge : edges) { + if (!isValidBooleanStr(edge.getProperty())) { + throw new PowerJobException("Illegal property of DecisionNode‘s out-degree edge,node name : " + node.getNodeName()); + } + boolean b = Boolean.parseBoolean(edge.getProperty()); + if (b) { + containTrue = true; + } else { + containFalse = true; + } + } + if (!containFalse || !containTrue) { + throw new PowerJobException("Illegal property of DecisionNode‘s out-degree edge,node name : " + node.getNodeName()); + } + + } + + + public static boolean isValidBooleanStr(String str) { + return StringUtils.equalsIgnoreCase(str.trim(), "true") || StringUtils.equalsIgnoreCase(str.trim(), "false"); + } + + + @Override + public WorkflowNodeType matchingType() { + return WorkflowNodeType.DECISION; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java new file mode 100644 index 00000000..cc056e1b --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java @@ -0,0 +1,41 @@ +package tech.powerjob.server.core.validator; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; + +import javax.annotation.Resource; + +/** + * @author Echo009 + * @since 2021/12/14 + */ +@Component +@Slf4j +public class JobNodeValidator implements NodeValidator { + + @Resource + private JobInfoRepository jobInfoRepository; + + @Override + public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { + // 判断对应的任务是否存在 + JobInfoDO job = jobInfoRepository.findById(node.getJobId()) + .orElseThrow(() -> new PowerJobException("Illegal job node,specified job is not exist,node name : " + node.getNodeName())); + + if (job.getStatus() == SwitchableStatus.DELETED.getV()) { + throw new PowerJobException("Illegal job node,specified job has been deleted,node name : " + node.getNodeName()); + } + } + + @Override + public WorkflowNodeType matchingType() { + return WorkflowNodeType.JOB; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java new file mode 100644 index 00000000..d0608656 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java @@ -0,0 +1,50 @@ +package tech.powerjob.server.core.validator; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; +import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * @author Echo009 + * @since 2021/12/14 + */ +@Component +@Slf4j +public class NestedWorkflowNodeValidator implements NodeValidator { + + @Resource + private WorkflowInfoRepository workflowInfoRepository; + + + @Override + public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) { + // 判断对应工作流是否存在 + WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getWfId()) + .orElseThrow(() -> new PowerJobException("Illegal nested workflow node,specified workflow is not exist,node name : " + node.getNodeName())); + if (workflowInfo.getStatus() == SwitchableStatus.DELETED.getV()) { + throw new PowerJobException("Illegal nested workflow node,specified workflow has been deleted,node name : " + node.getNodeName()); + } + // 不允许多层嵌套,即 嵌套工作流节点引用的工作流中不能包含嵌套节点 + PEWorkflowDAG peDag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class); + for (PEWorkflowDAG.Node peDagNode : peDag.getNodes()) { + if (Objects.equals(peDagNode.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) { + throw new PowerJobException("Illegal nested workflow node,specified workflow must be a simple workflow,node name : " + node.getNodeName()); + } + } + } + + @Override + public WorkflowNodeType matchingType() { + return WorkflowNodeType.NESTED_WORKFLOW; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java new file mode 100644 index 00000000..d35b0f82 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NodeValidator.java @@ -0,0 +1,25 @@ +package tech.powerjob.server.core.validator; + +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; + +/** + * @author Echo009 + * @since 2021/12/14 + */ +public interface NodeValidator { + /** + * 校验工作流节点是否合法 + * @param node 节点 + * @param dag dag + */ + void validate(PEWorkflowDAG.Node node, WorkflowDAG dag); + + /** + * 匹配的节点类型 + * @return node type + */ + WorkflowNodeType matchingType(); + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index 73e9373c..33fa51e0 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -16,6 +16,7 @@ import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.core.helper.StatusMappingHelper; import tech.powerjob.server.core.lock.UseSegmentLock; @@ -296,6 +297,7 @@ public class WorkflowInstanceManager { if (instanceId.equals(node.getInstanceId())) { node.setStatus(status.getV()); node.setResult(result); + node.setFinishedTime(CommonUtils.formatTime(System.currentTimeMillis())); instanceNode = node; log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index ff891c33..530e0977 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -18,6 +18,8 @@ import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; import tech.powerjob.server.common.utils.CronExpression; +import tech.powerjob.server.core.service.NodeValidateService; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; @@ -52,6 +54,8 @@ public class WorkflowService { private WorkflowNodeInfoRepository workflowNodeInfoRepository; @Resource private JobInfoRepository jobInfoRepository; + @Resource + private NodeValidateService nodeValidateService; /** * 保存/修改工作流信息 @@ -118,6 +122,7 @@ public class WorkflowService { // 其中 jobId,jobName 均以节点中的信息为准 List nodeIdList = Lists.newArrayList(); List newNodes = Lists.newArrayList(); + WorkflowDAG complexDag = WorkflowDAGUtils.convert(dag); for (PEWorkflowDAG.Node node : dag.getNodes()) { WorkflowNodeInfoDO nodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find node info by id :" + node.getNodeId())); // 更新工作流 ID @@ -129,6 +134,7 @@ public class WorkflowService { if (!wfId.equals(nodeInfo.getWorkflowId())) { throw new PowerJobException("can't use another workflow's node"); } + nodeValidateService.validate(node,complexDag); // 只保存节点的 ID 信息,清空其他信息 newNodes.add(new PEWorkflowDAG.Node(node.getNodeId())); nodeIdList.add(node.getNodeId()); @@ -353,6 +359,7 @@ public class WorkflowService { if (nodeInfo != null) { node.setNodeType(nodeInfo.getType()) .setJobId(nodeInfo.getJobId()) + .setWfId(nodeInfo.getId()) .setEnable(nodeInfo.getEnable()) .setSkipWhenFailed(nodeInfo.getSkipWhenFailed()) .setNodeName(nodeInfo.getNodeName()) diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java new file mode 100644 index 00000000..eb32a0af --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/validator/NodeValidatorTest.java @@ -0,0 +1,43 @@ +package tech.powerjob.server.core.validator; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; +import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; + +import static tech.powerjob.server.core.data.DataConstructUtil.*; + +/** + * @author Echo009 + * @since 2021/12/14 + */ +class NodeValidatorTest { + + private final DecisionNodeValidator decisionNodeValidator = new DecisionNodeValidator(); + + @Test + void testDecisionNodeValidator() { + + PEWorkflowDAG peWorkflowDAG = constructEmptyDAG(); + PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode()); + // decision node return true + node1.setNodeParams("true;"); + PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L); + PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L); + PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L); + addNodes(peWorkflowDAG, node1, node2, node3, node4); + + PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "z"); + PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "true"); + PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L); + addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4); + WorkflowDAG dag = WorkflowDAGUtils.convert(peWorkflowDAG); + Assert.assertThrows(PowerJobException.class, () -> decisionNodeValidator.validate(node1, dag)); + + } + + +}