diff --git a/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java b/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java index 222dd8d6..c403a048 100644 --- a/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java +++ b/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java @@ -70,20 +70,17 @@ class TestWorkflow extends ClientInitializer { // 创建节点 SaveWorkflowNodeRequest saveWorkflowNodeRequest1 = new SaveWorkflowNodeRequest(); saveWorkflowNodeRequest1.setJobId(1L); - saveWorkflowNodeRequest1.setWorkflowId(req.getId()); saveWorkflowNodeRequest1.setNodeName("DAG-Node-1"); saveWorkflowNodeRequest1.setType(WorkflowNodeType.JOB); SaveWorkflowNodeRequest saveWorkflowNodeRequest2 = new SaveWorkflowNodeRequest(); saveWorkflowNodeRequest2.setJobId(1L); - saveWorkflowNodeRequest2.setWorkflowId(req.getId()); saveWorkflowNodeRequest2.setNodeName("DAG-Node-2"); saveWorkflowNodeRequest2.setType(WorkflowNodeType.JOB); SaveWorkflowNodeRequest saveWorkflowNodeRequest3 = new SaveWorkflowNodeRequest(); saveWorkflowNodeRequest3.setJobId(1L); - saveWorkflowNodeRequest3.setWorkflowId(req.getId()); saveWorkflowNodeRequest3.setNodeName("DAG-Node-3"); saveWorkflowNodeRequest3.setType(WorkflowNodeType.JOB); @@ -97,9 +94,9 @@ class TestWorkflow extends ClientInitializer { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId(), 1L, "DAG-Node-1")); - nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId(), 1L, "DAG-Node-2")); - nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId(), 1L, "DAG-Node-3")); + nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId())); + nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId())); + nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId())); edges.add(new PEWorkflowDAG.Edge(nodeList.get(0).getId(), nodeList.get(1).getId())); edges.add(new PEWorkflowDAG.Edge(nodeList.get(1).getId(), nodeList.get(2).getId())); diff --git a/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java index 78469240..0050e1fc 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java @@ -45,6 +45,8 @@ public class SystemInstanceResult { public static final String MIDDLE_JOB_FAILED = "middle job failed"; public static final String MIDDLE_JOB_STOPPED = "middle job stopped by user"; public static final String CAN_NOT_FIND_JOB = "can't find some job"; + public static final String CAN_NOT_FIND_NODE = "can't find some node"; + public static final String ILLEGAL_NODE = "illegal node info"; /** * 没有启用的节点 diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java index 406ffacc..61167661 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java @@ -45,15 +45,22 @@ public class PEWorkflowDAG implements Serializable { * @since 20210128 */ private Long nodeId; + /* Instance running param, which is not required by DAG. */ + /** + * note type + * @since 20210316 + */ + private Integer nodeType; + /** + * job id + */ private Long jobId; /** * node name */ private String nodeName; - /* Instance running param, which is not required by DAG. */ - @JsonSerialize(using= ToStringSerializer.class) private Long instanceId; @@ -69,11 +76,8 @@ public class PEWorkflowDAG implements Serializable { private Boolean skipWhenFailed; - - public Node(Long nodeId,Long jobId, String nodeName) { + public Node(Long nodeId) { this.nodeId = nodeId; - this.jobId = jobId; - this.nodeName = nodeName; } } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java index b3dbbd5b..986fdd83 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowNodeRequest.java @@ -7,20 +7,18 @@ import lombok.Data; /** - * 新增工作流节点信息请求 + * 保存工作流节点信息请求 + * 工作流节点的 * * @author zenggonggu * @since 2021/02/02 */ @Data - public class SaveWorkflowNodeRequest { private Long id; private Long appId; - - private Long workflowId; /** * 节点类型(默认为任务节点) */ diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index b86d12bb..4424b7c5 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -14,6 +14,7 @@ import tech.powerjob.common.WorkflowContextConstant; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.utils.JsonUtils; import tech.powerjob.server.common.constants.SwitchableStatus; @@ -83,8 +84,79 @@ public class WorkflowInstanceManager { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); + // 构造实例信息 + WorkflowInstanceInfoDO newWfInstance = constructWfInstance(wfInfo, initParams, expectTriggerTime, wfId, wfInstanceId); + + PEWorkflowDAG dag; + try { + dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); + // 校验 DAG 信息 + if (!WorkflowDAGUtils.valid(dag)) { + log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId); + throw new PowerJobException(SystemInstanceResult.INVALID_DAG); + } + // 初始化节点信息 + initNodeInfo(dag); + newWfInstance.setDag(JSON.toJSONString(dag)); + // 最后检查工作流中的任务是否均处于可用状态(没有被删除) + Set allJobIds = Sets.newHashSet(); + dag.getNodes().forEach(node -> { + allJobIds.add(node.getJobId()); + // 将节点的初始状态置为等待派发 + node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + }); + int needNum = allJobIds.size(); + long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds); + log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); + if (dbNum < allJobIds.size()) { + log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); + throw new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB); + } + workflowInstanceInfoRepository.saveAndFlush(newWfInstance); + } catch (Exception e) { + onWorkflowInstanceFailed(e.getMessage(), newWfInstance); + } + return wfInstanceId; + } + + /** + * 初始化节点信息 + */ + private void initNodeInfo(PEWorkflowDAG dag) { + for (PEWorkflowDAG.Node node : dag.getNodes()) { + WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE)); + // 任务节点,需初始化 是否启用、是否允许失败跳过、节点参数 等信息 + if (workflowNodeInfo.getType() == null || workflowNodeInfo.getType() == WorkflowNodeType.JOB.getCode()) { + // 任务节点缺失任务信息 + if (workflowNodeInfo.getJobId() == null) { + throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE); + } + JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB)); + + node.setNodeType(WorkflowNodeType.JOB.getCode()); + // 初始化任务相关信息 + node.setJobId(workflowNodeInfo.getJobId()) + .setNodeName(workflowNodeInfo.getNodeName()) + .setEnable(workflowNodeInfo.getEnable()) + .setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed()); + + if (!StringUtils.isBlank(workflowNodeInfo.getNodeParams())) { + node.setNodeParams(workflowNodeInfo.getNodeParams()); + } else { + node.setNodeParams(jobInfo.getJobParams()); + } + } else { + // 非任务节点 + node.setNodeType(workflowNodeInfo.getType()); + } + } + } + + /** + * 构造工作流实例,并初始化基础信息(不包括 DAG ) + */ + private WorkflowInstanceInfoDO constructWfInstance(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long wfId, Long wfInstanceId) { - // 创建 并初始化 DAG 信息 Date now = new Date(); WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO(); newWfInstance.setAppId(wfInfo.getAppId()); @@ -101,43 +173,7 @@ public class WorkflowInstanceManager { newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); - - // 校验 DAG 信息 - PEWorkflowDAG dag; - try { - dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); - // 校验 - if (!WorkflowDAGUtils.valid(dag)) { - throw new PowerJobException(SystemInstanceResult.INVALID_DAG); - } - } catch (Exception e) { - log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId); - onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance); - return newWfInstance.getWfInstanceId(); - } - // 校验合法性(工作是否存在且启用) - Set allJobIds = Sets.newHashSet(); - dag.getNodes().forEach(node -> { - allJobIds.add(node.getJobId()); - // 将节点的初始状态置为等待派发 - node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); - }); - int needNum = allJobIds.size(); - // 检查工作流中的任务是否均处于可用状态(没有被删除) - long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds); - log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); - // 先 set 一次,异常的话直接存这个信息 - newWfInstance.setDag(JSON.toJSONString(dag)); - if (dbNum < allJobIds.size()) { - log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); - onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance); - } else { - initNodeInfo(dag); - // 再 set 一次,此时工作流中的节点信息已经完全初始化 - newWfInstance.setDag(JSON.toJSONString(dag)); - workflowInstanceInfoRepository.saveAndFlush(newWfInstance); - } - return wfInstanceId; + return newWfInstance; } /** @@ -373,41 +409,6 @@ public class WorkflowInstanceManager { } - /** - * 初始化节点信息 - * - * @param dag pe dag - * @since 20210205 - */ - private void initNodeInfo(PEWorkflowDAG dag) { - // 初始化节点信息(是否启用、是否允许失败跳过、节点参数) - for (PEWorkflowDAG.Node node : dag.getNodes()) { - Optional nodeInfoOpt = workflowNodeInfoRepository.findById(node.getNodeId()); - // 不考虑极端情况 - JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); - - if (!nodeInfoOpt.isPresent()) { - // 默认启用 + 不允许失败跳过 - node.setEnable(true) - .setSkipWhenFailed(false) - .setNodeParams(jobInfo.getJobParams()); - } else { - WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get(); - // 使用节点别名覆盖 - node.setNodeName(nodeInfo.getNodeName()) - .setEnable(nodeInfo.getEnable()) - .setSkipWhenFailed(nodeInfo.getSkipWhenFailed()); - // 如果节点中指定了参数信息,则取节点的,否则取 Job 上的 - if (!StringUtils.isBlank(nodeInfo.getNodeParams())) { - node.setNodeParams(nodeInfo.getNodeParams()); - } else { - node.setNodeParams(jobInfo.getJobParams()); - } - - } - } - } - /** * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index 849d7fc5..57f05347 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -1,11 +1,6 @@ package tech.powerjob.server.core.workflow; import com.alibaba.fastjson.JSON; -import tech.powerjob.common.PowerJobException; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.model.PEWorkflowDAG; -import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; -import tech.powerjob.common.request.http.SaveWorkflowRequest; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; @@ -13,6 +8,11 @@ import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import tech.powerjob.common.PowerJobException; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; +import tech.powerjob.common.request.http.SaveWorkflowRequest; import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; @@ -30,7 +30,6 @@ import javax.annotation.Resource; import javax.transaction.Transactional; import java.text.ParseException; import java.util.*; -import java.util.stream.Collectors; /** * Workflow 服务 @@ -98,8 +97,7 @@ public class WorkflowService { wf = workflowInfoRepository.saveAndFlush(wf); wfId = wf.getId(); } - - wf.setPeDAG(validateAndConvert2String(wfId,req.getDag())); + wf.setPeDAG(validateAndConvert2String(wfId, req.getDag())); workflowInfoRepository.saveAndFlush(wf); return wfId; } @@ -108,7 +106,7 @@ public class WorkflowService { * 保存 DAG 信息 * 这里会物理删除游离的节点信息 */ - private String validateAndConvert2String(Long wfId,PEWorkflowDAG dag) { + private String validateAndConvert2String(Long wfId, PEWorkflowDAG dag) { if (dag == null || CollectionUtils.isEmpty(dag.getNodes())) { return "{}"; } @@ -118,6 +116,7 @@ public class WorkflowService { // 注意:这里只会保存图相关的基础信息,nodeId,jobId,jobName(nodeAlias) // 其中 jobId,jobName 均以节点中的信息为准 List nodeIdList = Lists.newArrayList(); + List newNodes = Lists.newArrayList(); for (PEWorkflowDAG.Node node : dag.getNodes()) { WorkflowNodeInfoDO nodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find node info by id :" + node.getNodeId())); // 更新工作流 ID @@ -129,15 +128,13 @@ public class WorkflowService { if (!wfId.equals(nodeInfo.getWorkflowId())) { throw new PowerJobException("can't use another workflow's node"); } - // 节点中的名称信息一定是非空的 - node.setNodeName(nodeInfo.getNodeName()).setJobId(nodeInfo.getJobId()); - // 清空其他信息 - node.setEnable(null).setSkipWhenFailed(null).setInstanceId(null).setResult(null); + // 只保存节点的 ID 信息,清空其他信息 + newNodes.add(new PEWorkflowDAG.Node(node.getNodeId())); nodeIdList.add(node.getNodeId()); } - + dag.setNodes(newNodes); int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(wfId, nodeIdList); - log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow",wfId, deleteCount); + log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow", wfId, deleteCount); return JSON.toJSONString(dag); } @@ -295,15 +292,13 @@ public class WorkflowService { if (CollectionUtils.isEmpty(workflowNodeRequestList)) { return Collections.emptyList(); } - validate(workflowNodeRequestList); + final Long appId = workflowNodeRequestList.get(0).getAppId(); List res = new ArrayList<>(workflowNodeRequestList.size()); - // 记录变更过任务的节点 - List changeJobNodeList = new ArrayList<>(workflowNodeRequestList.size()); - // - Long wfId = null; for (SaveWorkflowNodeRequest req : workflowNodeRequestList) { - if (req.getWorkflowId() != null) { - wfId = req.getWorkflowId(); + req.valid(); + // 必须位于同一个 APP 下 + if (!appId.equals(req.getAppId())) { + throw new PowerJobException("node list must are in the same app"); } WorkflowNodeInfoDO workflowNodeInfo; if (req.getId() != null) { @@ -313,11 +308,11 @@ public class WorkflowService { workflowNodeInfo.setGmtCreate(new Date()); } JobInfoDO jobInfoDO = jobInfoRepository.findById(req.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + req.getJobId())); - // 变更任务的节点 - if (workflowNodeInfo.getJobId() != null && !workflowNodeInfo.getJobId().equals(req.getJobId())) { - changeJobNodeList.add(workflowNodeInfo); + if (!jobInfoDO.getAppId().equals(appId)) { + throw new PowerJobException("Permission Denied! can't use other app's job!"); } BeanUtils.copyProperties(req, workflowNodeInfo); + workflowNodeInfo.setType(req.getType().getCode()); // 如果名称为空则默认取任务名称 if (StringUtils.isEmpty(workflowNodeInfo.getNodeName())) { workflowNodeInfo.setNodeName(jobInfoDO.getJobName()); @@ -326,73 +321,9 @@ public class WorkflowService { workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo); res.add(workflowNodeInfo); } - // 同步变更 DAG 中的任务信息 - if (wfId != null && !changeJobNodeList.isEmpty()) { - updateDagJobInfo(changeJobNodeList, wfId); - } return res; } - private void updateDagJobInfo(List changeJobNodeList, Long wfId) { - WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(wfId).orElseGet(WorkflowInfoDO::new); - PEWorkflowDAG dag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class); - if (!CollectionUtils.isEmpty(dag.getNodes())) { - Map nodeId2NodeMap = dag.getNodes().stream().collect(Collectors.toMap(PEWorkflowDAG.Node::getNodeId, e -> e)); - for (WorkflowNodeInfoDO nodeInfo : changeJobNodeList) { - PEWorkflowDAG.Node node = nodeId2NodeMap.get(nodeInfo.getId()); - if (node != null) { - node.setJobId(nodeInfo.getJobId()); - } - } - } - workflowInfo.setPeDAG(JSON.toJSONString(dag)); - workflowInfo.setGmtModified(new Date()); - workflowInfoRepository.saveAndFlush(workflowInfo); - } - - /** - * 校验合法性 - * - 必须在同一个 APP 下 - * - 工作流 ID 只能都非空,或者都为 null - * - 对应的 job 信息是否存在 - * - * @param workflowNodeRequestList 非空工作流节点列表 - */ - @SuppressWarnings("squid:S3776") - private void validate(List workflowNodeRequestList) { - Long appId = workflowNodeRequestList.get(0).getAppId(); - Long wfId = null; - for (SaveWorkflowNodeRequest saveWorkflowNodeRequest : workflowNodeRequestList) { - saveWorkflowNodeRequest.valid(); - // 必须位于同一个 APP 下 - if (!appId.equals(saveWorkflowNodeRequest.getAppId())) { - throw new PowerJobException("node list must are in the same app"); - } - if (saveWorkflowNodeRequest.getWorkflowId() != null) { - if (wfId == null) { - wfId = saveWorkflowNodeRequest.getWorkflowId(); - } else { - // 工作流的 ID 必须一致 - if (!wfId.equals(saveWorkflowNodeRequest.getWorkflowId())) { - throw new PowerJobException("node list must are in the same workflow"); - } - } - } else { - // 存在部分节点有工作流 ID ,部分没有 - if (wfId != null) { - throw new PowerJobException("can't create and update node info in the same request"); - } - } - JobInfoDO jobInfo = jobInfoRepository.findById(saveWorkflowNodeRequest.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + saveWorkflowNodeRequest.getJobId())); - if (!jobInfo.getAppId().equals(appId)) { - throw new PowerJobException("Permission Denied! can't use other app's job!"); - } - } - if (wfId != null) { - permissionCheck(wfId, appId); - } - } - private void fillWorkflow(WorkflowInfoDO wfInfo) { diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java index 0aff9d18..6ab1ca80 100644 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DAGTest.java @@ -30,8 +30,8 @@ public class DAGTest { List edges = Lists.newLinkedList(); // 测试图1: 1 -> 2 -> 1,理论上报错 - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1")); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2")); + nodes.add(new PEWorkflowDAG.Node(1L)); + nodes.add(new PEWorkflowDAG.Node(2L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 1L)); Assert.assertFalse(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges))); @@ -43,10 +43,10 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1")); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2")); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3")); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); + nodes.add(new PEWorkflowDAG.Node(1L)); + nodes.add(new PEWorkflowDAG.Node(2L)); + nodes.add(new PEWorkflowDAG.Node(3L)); + nodes.add(new PEWorkflowDAG.Node(4L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(1L, 3L)); edges.add(new PEWorkflowDAG.Edge(2L, 4L)); @@ -78,10 +78,10 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1")); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2")); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3")); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); + nodes.add(new PEWorkflowDAG.Node(1L)); + nodes.add(new PEWorkflowDAG.Node(2L)); + nodes.add(new PEWorkflowDAG.Node(3L)); + nodes.add(new PEWorkflowDAG.Node(4L)); edges.add(new PEWorkflowDAG.Edge(1L, 3L)); edges.add(new PEWorkflowDAG.Edge(2L, 4L)); @@ -102,10 +102,10 @@ public class DAGTest { List edges = Lists.newLinkedList(); // 测试图4:(双顶点 单个环) 1 -> 3 -> 1, 2 -> 4 - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1")); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2")); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3")); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); + nodes.add(new PEWorkflowDAG.Node(1L)); + nodes.add(new PEWorkflowDAG.Node(2L)); + nodes.add(new PEWorkflowDAG.Node(3L)); + nodes.add(new PEWorkflowDAG.Node(4L)); edges.add(new PEWorkflowDAG.Edge(1L, 3L)); edges.add(new PEWorkflowDAG.Edge(3L, 1L)); edges.add(new PEWorkflowDAG.Edge(2L, 4L)); @@ -130,12 +130,12 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1")); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2")); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3")); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); - nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5")); - nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + nodes.add(new PEWorkflowDAG.Node(1L)); + nodes.add(new PEWorkflowDAG.Node(2L)); + nodes.add(new PEWorkflowDAG.Node(3L)); + nodes.add(new PEWorkflowDAG.Node(4L)); + nodes.add(new PEWorkflowDAG.Node(5L)); + nodes.add(new PEWorkflowDAG.Node(6L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 5L)); edges.add(new PEWorkflowDAG.Edge(5L, 6L)); @@ -167,15 +167,15 @@ public class DAGTest { List nodes1 = Lists.newLinkedList(); List edges1 = Lists.newLinkedList(); - nodes1.add(new PEWorkflowDAG.Node(1L, 1L, "1")); - nodes1.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); - nodes1.add(new PEWorkflowDAG.Node(3L, 3L, "3")); - nodes1.add(new PEWorkflowDAG.Node(4L, 4L, "4")); - nodes1.add(new PEWorkflowDAG.Node(5L, 5L, "5")); - nodes1.add(new PEWorkflowDAG.Node(6L, 6L, "6").setEnable(false)); - nodes1.add(new PEWorkflowDAG.Node(7L, 7L, "7").setEnable(false)); - nodes1.add(new PEWorkflowDAG.Node(8L, 8L, "8").setEnable(false)); - nodes1.add(new PEWorkflowDAG.Node(9L, 9L, "9")); + nodes1.add(new PEWorkflowDAG.Node(1L)); + nodes1.add(new PEWorkflowDAG.Node(2L).setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(3L)); + nodes1.add(new PEWorkflowDAG.Node(4L)); + nodes1.add(new PEWorkflowDAG.Node(5L)); + nodes1.add(new PEWorkflowDAG.Node(6L).setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(7L).setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(8L).setEnable(false)); + nodes1.add(new PEWorkflowDAG.Node(9L)); edges1.add(new PEWorkflowDAG.Edge(1L, 3L)); edges1.add(new PEWorkflowDAG.Edge(2L, 4L)); edges1.add(new PEWorkflowDAG.Edge(4L, 5L)); @@ -209,12 +209,12 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3")); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); - nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + nodes.add(new PEWorkflowDAG.Node(1L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L)); + nodes.add(new PEWorkflowDAG.Node(4L)); + nodes.add(new PEWorkflowDAG.Node(5L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(6L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 3L)); edges.add(new PEWorkflowDAG.Edge(3L, 4L)); @@ -248,13 +248,13 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV())); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setStatus(InstanceStatus.SUCCEED.getV())); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4")); - nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); - nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7")); + nodes.add(new PEWorkflowDAG.Node(1L).setStatus(InstanceStatus.FAILED.getV())); + nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L).setStatus(InstanceStatus.SUCCEED.getV())); + nodes.add(new PEWorkflowDAG.Node(4L)); + nodes.add(new PEWorkflowDAG.Node(5L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(6L)); + nodes.add(new PEWorkflowDAG.Node(7L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 4L)); edges.add(new PEWorkflowDAG.Edge(3L, 4L)); @@ -291,12 +291,12 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV())); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5")); - nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); + nodes.add(new PEWorkflowDAG.Node(1L).setStatus(InstanceStatus.FAILED.getV())); + nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(4L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(5L)); + nodes.add(new PEWorkflowDAG.Node(6L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 5L)); edges.add(new PEWorkflowDAG.Edge(5L, 6L)); @@ -334,13 +334,13 @@ public class DAGTest { List nodes = Lists.newLinkedList(); List edges = Lists.newLinkedList(); - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV())); - nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false)); - nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5")); - nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6")); - nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7")); + nodes.add(new PEWorkflowDAG.Node(1L).setStatus(InstanceStatus.FAILED.getV())); + nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(3L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(4L).setEnable(false)); + nodes.add(new PEWorkflowDAG.Node(5L)); + nodes.add(new PEWorkflowDAG.Node(6L)); + nodes.add(new PEWorkflowDAG.Node(7L)); edges.add(new PEWorkflowDAG.Edge(1L, 2L)); edges.add(new PEWorkflowDAG.Edge(2L, 5L)); edges.add(new PEWorkflowDAG.Edge(5L, 6L));