feat: update the workflow maintenance interface

This commit is contained in:
Echo009 2021-12-24 18:10:48 +08:00
parent d996b34a54
commit 4b14be8321
8 changed files with 278 additions and 0 deletions

View File

@ -0,0 +1,45 @@
package tech.powerjob.server.core.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.validator.NodeValidator;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
/**
* @author Echo009
* @since 2021/12/14
*/
@Service
@Slf4j
public class NodeValidateService {
private final Map<WorkflowNodeType, NodeValidator> nodeValidatorMap;
public NodeValidateService(List<NodeValidator> nodeValidators) {
nodeValidatorMap = new EnumMap<>(WorkflowNodeType.class);
nodeValidators.forEach(e -> nodeValidatorMap.put(e.matchingType(), e));
}
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
Integer nodeTypeCode = node.getNodeType();
if (nodeTypeCode == null) {
// 前向兼容默认为 任务节点
nodeTypeCode = WorkflowNodeType.JOB.getCode();
}
WorkflowNodeType nodeType = WorkflowNodeType.of(nodeTypeCode);
NodeValidator nodeValidator = nodeValidatorMap.get(nodeType);
if (nodeValidator == null) {
// 默认不需要校验
return;
}
nodeValidator.validate(node, dag);
}
}

View File

@ -0,0 +1,65 @@
package tech.powerjob.server.core.validator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import java.util.Collection;
/**
* @author Echo009
* @since 2021/12/14
*/
@Component
@Slf4j
public class DecisionNodeValidator implements NodeValidator {
@Override
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
// 简单校验
String nodeParams = node.getNodeParams();
if (StringUtils.isBlank(nodeParams)) {
throw new PowerJobException("DecisionNodes param must be not null,node name : " + node.getNodeName());
}
// 出度固定为 2
WorkflowDAG.Node nodeWrapper = dag.getNode(node.getNodeId());
Collection<PEWorkflowDAG.Edge> edges = nodeWrapper.getSuccessorEdgeMap().values();
if (edges.size() != 2) {
throw new PowerJobException("DecisionNodes out-degree must be 2,node name : " + node.getNodeName());
}
// 边的属性必须为 ture 或者 false
boolean containFalse = false;
boolean containTrue = false;
for (PEWorkflowDAG.Edge edge : edges) {
if (!isValidBooleanStr(edge.getProperty())) {
throw new PowerJobException("Illegal property of DecisionNodes out-degree edge,node name : " + node.getNodeName());
}
boolean b = Boolean.parseBoolean(edge.getProperty());
if (b) {
containTrue = true;
} else {
containFalse = true;
}
}
if (!containFalse || !containTrue) {
throw new PowerJobException("Illegal property of DecisionNodes out-degree edge,node name : " + node.getNodeName());
}
}
public static boolean isValidBooleanStr(String str) {
return StringUtils.equalsIgnoreCase(str.trim(), "true") || StringUtils.equalsIgnoreCase(str.trim(), "false");
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.DECISION;
}
}

View File

@ -0,0 +1,41 @@
package tech.powerjob.server.core.validator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import javax.annotation.Resource;
/**
* @author Echo009
* @since 2021/12/14
*/
@Component
@Slf4j
public class JobNodeValidator implements NodeValidator {
@Resource
private JobInfoRepository jobInfoRepository;
@Override
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
// 判断对应的任务是否存在
JobInfoDO job = jobInfoRepository.findById(node.getJobId())
.orElseThrow(() -> new PowerJobException("Illegal job node,specified job is not exist,node name : " + node.getNodeName()));
if (job.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new PowerJobException("Illegal job node,specified job has been deleted,node name : " + node.getNodeName());
}
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.JOB;
}
}

View File

@ -0,0 +1,50 @@
package tech.powerjob.server.core.validator;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import javax.annotation.Resource;
import java.util.Objects;
/**
* @author Echo009
* @since 2021/12/14
*/
@Component
@Slf4j
public class NestedWorkflowNodeValidator implements NodeValidator {
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Override
public void validate(PEWorkflowDAG.Node node, WorkflowDAG dag) {
// 判断对应工作流是否存在
WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getWfId())
.orElseThrow(() -> new PowerJobException("Illegal nested workflow node,specified workflow is not exist,node name : " + node.getNodeName()));
if (workflowInfo.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new PowerJobException("Illegal nested workflow node,specified workflow has been deleted,node name : " + node.getNodeName());
}
// 不允许多层嵌套 嵌套工作流节点引用的工作流中不能包含嵌套节点
PEWorkflowDAG peDag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class);
for (PEWorkflowDAG.Node peDagNode : peDag.getNodes()) {
if (Objects.equals(peDagNode.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) {
throw new PowerJobException("Illegal nested workflow node,specified workflow must be a simple workflow,node name : " + node.getNodeName());
}
}
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.NESTED_WORKFLOW;
}
}

View File

@ -0,0 +1,25 @@
package tech.powerjob.server.core.validator;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
/**
* @author Echo009
* @since 2021/12/14
*/
public interface NodeValidator {
/**
* 校验工作流节点是否合法
* @param node 节点
* @param dag dag
*/
void validate(PEWorkflowDAG.Node node, WorkflowDAG dag);
/**
* 匹配的节点类型
* @return node type
*/
WorkflowNodeType matchingType();
}

View File

@ -16,6 +16,7 @@ import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseSegmentLock;
@ -296,6 +297,7 @@ public class WorkflowInstanceManager {
if (instanceId.equals(node.getInstanceId())) {
node.setStatus(status.getV());
node.setResult(result);
node.setFinishedTime(CommonUtils.formatTime(System.currentTimeMillis()));
instanceNode = node;
log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result);
}

View File

@ -18,6 +18,8 @@ import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.core.service.NodeValidateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
@ -52,6 +54,8 @@ public class WorkflowService {
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private NodeValidateService nodeValidateService;
/**
* 保存/修改工作流信息
@ -118,6 +122,7 @@ public class WorkflowService {
// 其中 jobIdjobName 均以节点中的信息为准
List<Long> nodeIdList = Lists.newArrayList();
List<PEWorkflowDAG.Node> newNodes = Lists.newArrayList();
WorkflowDAG complexDag = WorkflowDAGUtils.convert(dag);
for (PEWorkflowDAG.Node node : dag.getNodes()) {
WorkflowNodeInfoDO nodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find node info by id :" + node.getNodeId()));
// 更新工作流 ID
@ -129,6 +134,7 @@ public class WorkflowService {
if (!wfId.equals(nodeInfo.getWorkflowId())) {
throw new PowerJobException("can't use another workflow's node");
}
nodeValidateService.validate(node,complexDag);
// 只保存节点的 ID 信息清空其他信息
newNodes.add(new PEWorkflowDAG.Node(node.getNodeId()));
nodeIdList.add(node.getNodeId());
@ -353,6 +359,7 @@ public class WorkflowService {
if (nodeInfo != null) {
node.setNodeType(nodeInfo.getType())
.setJobId(nodeInfo.getJobId())
.setWfId(nodeInfo.getId())
.setEnable(nodeInfo.getEnable())
.setSkipWhenFailed(nodeInfo.getSkipWhenFailed())
.setNodeName(nodeInfo.getNodeName())

View File

@ -0,0 +1,43 @@
package tech.powerjob.server.core.validator;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import static tech.powerjob.server.core.data.DataConstructUtil.*;
/**
* @author Echo009
* @since 2021/12/14
*/
class NodeValidatorTest {
private final DecisionNodeValidator decisionNodeValidator = new DecisionNodeValidator();
@Test
void testDecisionNodeValidator() {
PEWorkflowDAG peWorkflowDAG = constructEmptyDAG();
PEWorkflowDAG.Node node1 = new PEWorkflowDAG.Node(1L, WorkflowNodeType.DECISION.getCode());
// decision node return true
node1.setNodeParams("true;");
PEWorkflowDAG.Node node2 = new PEWorkflowDAG.Node(2L);
PEWorkflowDAG.Node node3 = new PEWorkflowDAG.Node(3L);
PEWorkflowDAG.Node node4 = new PEWorkflowDAG.Node(4L);
addNodes(peWorkflowDAG, node1, node2, node3, node4);
PEWorkflowDAG.Edge edge1_2 = new PEWorkflowDAG.Edge(1L, 2L, "z");
PEWorkflowDAG.Edge edge1_3 = new PEWorkflowDAG.Edge(1L, 3L, "true");
PEWorkflowDAG.Edge edge2_4 = new PEWorkflowDAG.Edge(2L, 4L);
addEdges(peWorkflowDAG, edge1_2, edge1_3, edge2_4);
WorkflowDAG dag = WorkflowDAGUtils.convert(peWorkflowDAG);
Assert.assertThrows(PowerJobException.class, () -> decisionNodeValidator.validate(node1, dag));
}
}