fix: workflow node validator

This commit is contained in:
Echo009 2022-01-15 15:52:59 +08:00
parent 4b14be8321
commit 9194641c6f
12 changed files with 108 additions and 78 deletions

View File

@ -71,18 +71,18 @@ class TestWorkflow extends ClientInitializer {
SaveWorkflowNodeRequest saveWorkflowNodeRequest1 = new SaveWorkflowNodeRequest();
saveWorkflowNodeRequest1.setJobId(1L);
saveWorkflowNodeRequest1.setNodeName("DAG-Node-1");
saveWorkflowNodeRequest1.setType(WorkflowNodeType.JOB);
saveWorkflowNodeRequest1.setType(WorkflowNodeType.JOB.getCode());
SaveWorkflowNodeRequest saveWorkflowNodeRequest2 = new SaveWorkflowNodeRequest();
saveWorkflowNodeRequest2.setJobId(1L);
saveWorkflowNodeRequest2.setNodeName("DAG-Node-2");
saveWorkflowNodeRequest2.setType(WorkflowNodeType.JOB);
saveWorkflowNodeRequest2.setType(WorkflowNodeType.JOB.getCode());
SaveWorkflowNodeRequest saveWorkflowNodeRequest3 = new SaveWorkflowNodeRequest();
saveWorkflowNodeRequest3.setJobId(1L);
saveWorkflowNodeRequest3.setNodeName("DAG-Node-3");
saveWorkflowNodeRequest3.setType(WorkflowNodeType.JOB);
saveWorkflowNodeRequest3.setType(WorkflowNodeType.JOB.getCode());
List<WorkflowNodeInfoDTO> nodeList = powerJobClient.saveWorkflowNode(Lists.newArrayList(saveWorkflowNodeRequest1,saveWorkflowNodeRequest2,saveWorkflowNodeRequest3)).getData();

View File

@ -55,14 +55,11 @@ public class PEWorkflowDAG implements Serializable {
*/
private Integer nodeType;
/**
* job id
*/
private Long jobId;
/**
* workflow id,support for nested workflow
* job id or workflow id (if this Node type is a nested workflow)
*
* @see WorkflowNodeType#NESTED_WORKFLOW
*/
private Long wfId;
private Long jobId;
/**
* node name
*/

View File

@ -1,8 +1,8 @@
package tech.powerjob.common.request.http;
import lombok.Data;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.utils.CommonUtils;
import lombok.Data;
@ -22,7 +22,7 @@ public class SaveWorkflowNodeRequest {
/**
* 节点类型(默认为任务节点)
*/
private WorkflowNodeType type = WorkflowNodeType.JOB;
private Integer type;
/**
* 任务 ID
*/
@ -44,10 +44,11 @@ public class SaveWorkflowNodeRequest {
*/
private Boolean skipWhenFailed = false;
public void valid(){
public void valid() {
CommonUtils.requireNonNull(this.appId, "appId can't be empty");
CommonUtils.requireNonNull(this.type, "type can't be empty");
if (type == WorkflowNodeType.JOB) {
final WorkflowNodeType workflowNodeType = WorkflowNodeType.of(type);
if (workflowNodeType == WorkflowNodeType.JOB || workflowNodeType == WorkflowNodeType.NESTED_WORKFLOW) {
CommonUtils.requireNonNull(this.jobId, "jobId can't be empty");
}
}

View File

@ -3,9 +3,9 @@ package tech.powerjob.server.core.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.validator.NodeValidator;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import java.util.EnumMap;
import java.util.List;
@ -27,19 +27,30 @@ public class NodeValidateService {
}
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
Integer nodeTypeCode = node.getNodeType();
if (nodeTypeCode == null) {
// 前向兼容默认为 任务节点
nodeTypeCode = WorkflowNodeType.JOB.getCode();
}
WorkflowNodeType nodeType = WorkflowNodeType.of(nodeTypeCode);
NodeValidator nodeValidator = nodeValidatorMap.get(nodeType);
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
NodeValidator nodeValidator = getNodeValidator(node);
if (nodeValidator == null) {
// 默认不需要校验
return;
}
nodeValidator.validate(node, dag);
nodeValidator.complexValidate(node, dag);
}
public void simpleValidate(WorkflowNodeInfoDO node) {
NodeValidator nodeValidator = getNodeValidator(node);
if (nodeValidator == null) {
// 默认不需要校验
return;
}
nodeValidator.simpleValidate(node);
}
private NodeValidator getNodeValidator(WorkflowNodeInfoDO node) {
Integer nodeTypeCode = node.getType();
if (nodeTypeCode == null) {
// 前向兼容默认为 任务节点
return nodeValidatorMap.get(WorkflowNodeType.JOB);
}
return nodeValidatorMap.get(WorkflowNodeType.of(nodeTypeCode));
}
}

View File

@ -7,6 +7,7 @@ import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import java.util.Collection;
@ -20,14 +21,9 @@ public class DecisionNodeValidator implements NodeValidator {
@Override
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
// 简单校验
String nodeParams = node.getNodeParams();
if (StringUtils.isBlank(nodeParams)) {
throw new PowerJobException("DecisionNodes param must be not null,node name : " + node.getNodeName());
}
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
// 出度固定为 2
WorkflowDAG.Node nodeWrapper = dag.getNode(node.getNodeId());
WorkflowDAG.Node nodeWrapper = dag.getNode(node.getId());
Collection<PEWorkflowDAG.Edge> edges = nodeWrapper.getSuccessorEdgeMap().values();
if (edges.size() != 2) {
throw new PowerJobException("DecisionNodes out-degree must be 2,node name : " + node.getNodeName());
@ -52,6 +48,15 @@ public class DecisionNodeValidator implements NodeValidator {
}
@Override
public void simpleValidate(WorkflowNodeInfoDO node) {
// 简单校验
String nodeParams = node.getNodeParams();
if (StringUtils.isBlank(nodeParams)) {
throw new PowerJobException("DecisionNodes param must be not null,node name : " + node.getNodeName());
}
}
public static boolean isValidBooleanStr(String str) {
return StringUtils.equalsIgnoreCase(str.trim(), "true") || StringUtils.equalsIgnoreCase(str.trim(), "false");

View File

@ -4,10 +4,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import javax.annotation.Resource;
@ -24,7 +24,12 @@ public class JobNodeValidator implements NodeValidator {
private JobInfoRepository jobInfoRepository;
@Override
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
// do nothing
}
@Override
public void simpleValidate(WorkflowNodeInfoDO node) {
// 判断对应的任务是否存在
JobInfoDO job = jobInfoRepository.findById(node.getJobId())
.orElseThrow(() -> new PowerJobException("Illegal job node,specified job is not exist,node name : " + node.getNodeName()));

View File

@ -9,10 +9,13 @@ import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.Optional;
/**
* @author Echo009
@ -24,12 +27,19 @@ public class NestedWorkflowNodeValidator implements NodeValidator {
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Override
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
// do nothing
}
@Override
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
public void simpleValidate(WorkflowNodeInfoDO node) {
// 判断对应工作流是否存在
WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getWfId())
WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getJobId())
.orElseThrow(() -> new PowerJobException("Illegal nested workflow node,specified workflow is not exist,node name : " + node.getNodeName()));
if (workflowInfo.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new PowerJobException("Illegal nested workflow node,specified workflow has been deleted,node name : " + node.getNodeName());
@ -37,7 +47,13 @@ public class NestedWorkflowNodeValidator implements NodeValidator {
// 不允许多层嵌套 嵌套工作流节点引用的工作流中不能包含嵌套节点
PEWorkflowDAG peDag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class);
for (PEWorkflowDAG.Node peDagNode : peDag.getNodes()) {
if (Objects.equals(peDagNode.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) {
//
final Optional<WorkflowNodeInfoDO> nestWfNodeOp = workflowNodeInfoRepository.findById(peDagNode.getNodeId());
if (!nestWfNodeOp.isPresent()) {
// 嵌套的工作流无效缺失节点元数据
throw new PowerJobException("Illegal nested workflow node,specified workflow is invalidate,node name : " + node.getNodeName());
}
if (Objects.equals(nestWfNodeOp.get().getType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) {
throw new PowerJobException("Illegal nested workflow node,specified workflow must be a simple workflow,node name : " + node.getNodeName());
}
}

View File

@ -1,8 +1,8 @@
package tech.powerjob.server.core.validator;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
/**
* @author Echo009
@ -10,11 +10,17 @@ import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
*/
public interface NodeValidator {
/**
* 校验工作流节点是否合法
* 校验工作流节点校验拓扑关系等
* @param node 节点
* @param dag dag
*/
void validate(PEWorkflowDAG.Node node, WorkflowDAG dag);
void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag);
/**
* 校验工作流节点
* @param node 节点
*/
void simpleValidate(WorkflowNodeInfoDO node);
/**
* 匹配的节点类型

View File

@ -133,29 +133,29 @@ public class WorkflowInstanceManager {
private void initNodeInfo(PEWorkflowDAG dag) {
for (PEWorkflowDAG.Node node : dag.getNodes()) {
WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE));
// 任务节点需初始化 是否启用是否允许失败跳过节点参数 等信息
if (workflowNodeInfo.getType() == null || workflowNodeInfo.getType() == WorkflowNodeType.JOB.getCode()) {
if (workflowNodeInfo.getType() == null) {
// 前向兼容
workflowNodeInfo.setType(WorkflowNodeType.JOB.getCode());
}
// 填充基础信息
node.setNodeType(workflowNodeInfo.getType())
.setJobId(workflowNodeInfo.getJobId())
.setNodeName(workflowNodeInfo.getNodeName())
.setEnable(workflowNodeInfo.getEnable())
.setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed());
// 任务节点初始化节点参数时需要特殊处理
if (node.getNodeType() == WorkflowNodeType.JOB.getCode()) {
// 任务节点缺失任务信息
if (workflowNodeInfo.getJobId() == null) {
throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE);
}
JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB));
node.setNodeType(WorkflowNodeType.JOB.getCode());
// 初始化任务相关信息
node.setJobId(workflowNodeInfo.getJobId())
.setNodeName(workflowNodeInfo.getNodeName())
.setEnable(workflowNodeInfo.getEnable())
.setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed());
if (!StringUtils.isBlank(workflowNodeInfo.getNodeParams())) {
node.setNodeParams(workflowNodeInfo.getNodeParams());
} else {
node.setNodeParams(jobInfo.getJobParams());
}
} else {
// 非任务节点
node.setNodeType(workflowNodeInfo.getType());
}
}
}
@ -427,7 +427,7 @@ public class WorkflowInstanceManager {
updateWorkflowContext(wfInstance.getParentWfInstanceId(),wfContext);
}
// 处理父工作流
move(wfInstance.getWfInstanceId(),wfInstance.getParentWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus),result);
move(wfInstance.getParentWfInstanceId(), wfInstance.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), result);
}
// 报警

View File

@ -8,9 +8,8 @@ import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
@ -21,10 +20,8 @@ import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.core.service.NodeValidateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
@ -53,8 +50,6 @@ public class WorkflowService {
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private NodeValidateService nodeValidateService;
/**
@ -134,7 +129,7 @@ public class WorkflowService {
if (!wfId.equals(nodeInfo.getWorkflowId())) {
throw new PowerJobException("can't use another workflow's node");
}
nodeValidateService.validate(node,complexDag);
nodeValidateService.complexValidate(nodeInfo, complexDag);
// 只保存节点的 ID 信息清空其他信息
newNodes.add(new PEWorkflowDAG.Node(node.getNodeId()));
nodeIdList.add(node.getNodeId());
@ -313,20 +308,9 @@ public class WorkflowService {
workflowNodeInfo = new WorkflowNodeInfoDO();
workflowNodeInfo.setGmtCreate(new Date());
}
// valid job info
if (req.getType() == WorkflowNodeType.JOB) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(req.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + req.getJobId()));
if (!jobInfoDO.getAppId().equals(appId)) {
throw new PowerJobException("Permission Denied! can't use other app's job!");
}
if (StringUtils.isEmpty(workflowNodeInfo.getNodeName())) {
workflowNodeInfo.setNodeName(jobInfoDO.getJobName());
}
}
BeanUtils.copyProperties(req, workflowNodeInfo);
workflowNodeInfo.setType(req.getType().getCode());
workflowNodeInfo.setType(req.getType());
nodeValidateService.simpleValidate(workflowNodeInfo);
workflowNodeInfo.setGmtModified(new Date());
workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo);
res.add(workflowNodeInfo);
@ -359,7 +343,6 @@ public class WorkflowService {
if (nodeInfo != null) {
node.setNodeType(nodeInfo.getType())
.setJobId(nodeInfo.getJobId())
.setWfId(nodeInfo.getId())
.setEnable(nodeInfo.getEnable())
.setSkipWhenFailed(nodeInfo.getSkipWhenFailed())
.setNodeName(nodeInfo.getNodeName())

View File

@ -39,13 +39,13 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// check
Long wfId = node.getWfId();
Long wfId = node.getJobId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
if (targetWf == null || targetWf.getStatus() == SwitchableStatus.DELETED.getV()) {
if (targetWf == null) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId());
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId());
} else {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId());
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId());
}
throw new PowerJobException("invalid nested workflow node," + node.getNodeId());
}
@ -71,7 +71,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
@Override
public void startTaskInstance(PEWorkflowDAG.Node node) {
Long wfId = node.getWfId();
Long wfId = node.getJobId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
workflowInstanceManager.start(targetWf, node.getInstanceId());
}

View File

@ -2,11 +2,13 @@ package tech.powerjob.server.core.validator;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.springframework.beans.BeanUtils;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import static tech.powerjob.server.core.data.DataConstructUtil.*;
@ -35,7 +37,11 @@ class NodeValidatorTest {
PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L);
addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4);
WorkflowDAG dag = WorkflowDAGUtils.convert(peWorkflowDAG);
Assert.assertThrows(PowerJobException.class, () -> decisionNodeValidator.validate(node1, dag));
final WorkflowNodeInfoDO workflowNodeInfo1 = new WorkflowNodeInfoDO();
BeanUtils.copyProperties(node1, workflowNodeInfo1);
workflowNodeInfo1.setId(node1.getNodeId());
Assert.assertThrows(PowerJobException.class, () -> decisionNodeValidator.complexValidate(workflowNodeInfo1, dag));
}