diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 7caf90d5..7c0e6296 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -360,49 +360,22 @@ public class OhMyClient { } - /** - * 保存工作流 DAG - * - * @param request DAG of Workflow - * @return Standard return object - */ - public ResultDTO saveWorkflowDag(SaveWorkflowDAGRequest request) { - request.setAppId(appId); - MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); - String json = JsonUtils.toJSONStringUnsafe(request); - String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_DAG, RequestBody.create(jsonType, json)); - return JSON.parseObject(post, VOID_RESULT_TYPE); - } - /** * 添加工作流节点 * * @param requestList Node info list of Workflow * @return Standard return object */ - public ResultDTO> addWorkflowNode(List requestList) { - for (AddWorkflowNodeRequest addWorkflowNodeRequest : requestList) { - addWorkflowNodeRequest.setAppId(appId); + public ResultDTO> saveWorkflowNode(List requestList) { + for (SaveWorkflowNodeRequest saveWorkflowNodeRequest : requestList) { + saveWorkflowNodeRequest.setAppId(appId); } MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); String json = JsonUtils.toJSONStringUnsafe(requestList); - String post = postHA(OpenAPIConstant.ADD_WORKFLOW_NODE, RequestBody.create(jsonType, json)); + String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_NODE, RequestBody.create(jsonType, json)); return JSON.parseObject(post, WF_NODE_LIST_RESULT_TYPE); } - /** - * 修改工作流节点 - * - * @param request Node info of Workflow - * @return Standard return object - */ - public ResultDTO modifyWorkflowNode(ModifyWorkflowNodeRequest request) { - request.setAppId(appId); - MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); - String json = JsonUtils.toJSONStringUnsafe(request); - String post = postHA(OpenAPIConstant.MODIFY_WORKFLOW_NODE, RequestBody.create(jsonType, json)); - return JSON.parseObject(post, VOID_RESULT_TYPE); - } /** diff --git a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java index ccc74c94..3cfae51d 100644 --- a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java @@ -62,6 +62,50 @@ class TestWorkflow extends ClientInitializer { System.out.println(res); Assertions.assertNotNull(res); + req.setId(res.getData()); + + // 创建节点 + SaveWorkflowNodeRequest saveWorkflowNodeRequest1 = new SaveWorkflowNodeRequest(); + saveWorkflowNodeRequest1.setJobId(1L); + saveWorkflowNodeRequest1.setWorkflowId(req.getId()); + saveWorkflowNodeRequest1.setNodeName("DAG-Node-1"); + + SaveWorkflowNodeRequest saveWorkflowNodeRequest2 = new SaveWorkflowNodeRequest(); + saveWorkflowNodeRequest2.setJobId(1L); + saveWorkflowNodeRequest2.setWorkflowId(req.getId()); + saveWorkflowNodeRequest2.setNodeName("DAG-Node-2"); + + + SaveWorkflowNodeRequest saveWorkflowNodeRequest3 = new SaveWorkflowNodeRequest(); + saveWorkflowNodeRequest3.setJobId(1L); + saveWorkflowNodeRequest3.setWorkflowId(req.getId()); + saveWorkflowNodeRequest3.setNodeName("DAG-Node-3"); + + + List nodeList = ohMyClient.saveWorkflowNode(Lists.newArrayList(saveWorkflowNodeRequest1,saveWorkflowNodeRequest2,saveWorkflowNodeRequest3)).getData(); + System.out.println(nodeList); + Assertions.assertNotNull(nodeList); + + + // DAG 图 + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + + nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId(), 1L, "DAG-Node-1")); + nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId(), 1L, "DAG-Node-2")); + nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId(), 1L, "DAG-Node-3")); + + edges.add(new PEWorkflowDAG.Edge(nodeList.get(0).getId(), nodeList.get(1).getId())); + edges.add(new PEWorkflowDAG.Edge(nodeList.get(1).getId(), nodeList.get(2).getId())); + PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, edges); + + // 保存完整信息 + req.setDag(peWorkflowDAG); + res = ohMyClient.saveWorkflow(req); + + System.out.println(res); + Assertions.assertNotNull(res); + } @Test @@ -71,52 +115,6 @@ class TestWorkflow extends ClientInitializer { Assertions.assertNotNull(res); } - @Test - void testAddWorkflowNode() { - AddWorkflowNodeRequest addWorkflowNodeRequest = new AddWorkflowNodeRequest(); - addWorkflowNodeRequest.setJobId(1L); - addWorkflowNodeRequest.setWorkflowId(WF_ID); - ResultDTO> res = ohMyClient.addWorkflowNode(Lists.newArrayList(addWorkflowNodeRequest)); - System.out.println(res); - Assertions.assertNotNull(res); - } - - @Test - void testModifyWorkflowNode() { - ModifyWorkflowNodeRequest modifyWorkflowNodeRequest = new ModifyWorkflowNodeRequest(); - modifyWorkflowNodeRequest.setWorkflowId(WF_ID); - modifyWorkflowNodeRequest.setId(1L); - modifyWorkflowNodeRequest.setNodeAlias("(๑•̀ㅂ•́)و✧"); - modifyWorkflowNodeRequest.setEnable(false); - modifyWorkflowNodeRequest.setSkipWhenFailed(false); - ResultDTO res = ohMyClient.modifyWorkflowNode(modifyWorkflowNodeRequest); - System.out.println(res); - Assertions.assertNotNull(res); - } - - @Test - void testSaveWorkflowDag() { - // DAG 图 - List nodes = Lists.newLinkedList(); - List edges = Lists.newLinkedList(); - - nodes.add(new PEWorkflowDAG.Node(1L, 1L, "DAG-Node-1")); - nodes.add(new PEWorkflowDAG.Node(2L, 1L, "DAG-Node-2")); - nodes.add(new PEWorkflowDAG.Node(3L, 1L, "DAG-Node-3")); - - edges.add(new PEWorkflowDAG.Edge(1L, 2L)); - edges.add(new PEWorkflowDAG.Edge(2L, 3L)); - - PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, edges); - - SaveWorkflowDAGRequest saveWorkflowDAGRequest = new SaveWorkflowDAGRequest(); - saveWorkflowDAGRequest.setId(WF_ID); - saveWorkflowDAGRequest.setDag(peWorkflowDAG); - ResultDTO res = ohMyClient.saveWorkflowDag(saveWorkflowDAGRequest); - System.out.println(res); - Assertions.assertNotNull(res); - } - @Test void testDisableWorkflow() { diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java index 45e17437..a581e229 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java @@ -46,9 +46,7 @@ public class OpenAPIConstant { public static final String ENABLE_WORKFLOW = "/enableWorkflow"; public static final String DELETE_WORKFLOW = "/deleteWorkflow"; public static final String RUN_WORKFLOW = "/runWorkflow"; - public static final String ADD_WORKFLOW_NODE = "/addWorkflowNode"; - public static final String MODIFY_WORKFLOW_NODE = "/modifyWorkflowNode"; - public static final String SAVE_WORKFLOW_DAG = "/saveWorkflowDAG"; + public static final String SAVE_WORKFLOW_NODE = "/addWorkflowNode"; /* ************* WorkflowInstance 区 ************* */ diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java index 8aceef46..2534c33c 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/PEWorkflowDAG.java @@ -48,16 +48,16 @@ public class PEWorkflowDAG implements Serializable { private Long jobId; /** - * job alias or job name + * node name */ - private String jobName; + private String nodeName; /* Instance running param, which is not required by DAG. */ @JsonSerialize(using= ToStringSerializer.class) private Long instanceId; - private String jobParams; + private String nodeParams; private Integer status; @@ -70,10 +70,10 @@ public class PEWorkflowDAG implements Serializable { private Boolean skipWhenFailed; - public Node(Long nodeId,Long jobId, String jobName) { + public Node(Long nodeId,Long jobId, String nodeName) { this.nodeId = nodeId; this.jobId = jobId; - this.jobName = jobName; + this.nodeName = nodeName; } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/ModifyWorkflowNodeRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/ModifyWorkflowNodeRequest.java deleted file mode 100644 index 563c727a..00000000 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/ModifyWorkflowNodeRequest.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.github.kfcfans.powerjob.common.request.http; - -import com.github.kfcfans.powerjob.common.utils.CommonUtils; -import lombok.Data; - -/** - * 新增工作流节点信息请求 - * - * @author zenggonggu - * @since 2021/02/02 - */ -@Data -public class ModifyWorkflowNodeRequest { - - private Long id; - - private Long appId; - - private Long workflowId; - - /** - * 节点别名,默认为对应的任务名称 - */ - private String nodeAlias; - /** - * 节点参数 - */ - private String nodeParams; - /** - * 是否启用 - */ - private Boolean enable; - /** - * 是否允许失败跳过 - */ - private Boolean skipWhenFailed; - - - public void valid(){ - CommonUtils.requireNonNull(this.id, "id can't be empty"); - CommonUtils.requireNonNull(this.appId, "appId can't be empty"); - CommonUtils.requireNonNull(this.nodeAlias, "nodeAlias can't be empty"); - CommonUtils.requireNonNull(this.enable, "enable can't be empty"); - CommonUtils.requireNonNull(this.skipWhenFailed, "skipWhenFailed can't be empty"); - } -} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowDAGRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowDAGRequest.java deleted file mode 100644 index d7cbf8d2..00000000 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowDAGRequest.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.github.kfcfans.powerjob.common.request.http; - -import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; -import com.github.kfcfans.powerjob.common.utils.CommonUtils; -import lombok.Data; - -/** - * 新增工作流节点信息请求 - * - * @author zenggonggu - * @since 2021/02/02 - */ -@Data -public class SaveWorkflowDAGRequest { - /** workflowId **/ - private Long id; - - /** 所属应用ID(OpenClient不需要用户填写,自动填充)*/ - private Long appId; - - /** 点线表示法*/ - private PEWorkflowDAG dag; - - public void valid() { - CommonUtils.requireNonNull(this.appId, "appId can't be empty"); - CommonUtils.requireNonNull(this.id, "workflowId can't be empty"); - CommonUtils.requireNonNull(dag, "dag can't be empty"); - CommonUtils.requireNonNull(dag.getNodes(), "nodes can't be empty"); - CommonUtils.requireNonNull(dag.getEdges(), "edges can't be empty"); - for (PEWorkflowDAG.Node node : dag.getNodes()) { - // 清空其他信息 - node.setEnable(null).setSkipWhenFailed(null).setInstanceId(null).setJobName(null).setResult(null); - } - } -} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/AddWorkflowNodeRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowNodeRequest.java similarity index 80% rename from powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/AddWorkflowNodeRequest.java rename to powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowNodeRequest.java index bb054ed7..a15fbcb5 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/AddWorkflowNodeRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowNodeRequest.java @@ -12,11 +12,18 @@ import lombok.Data; * @since 2021/02/02 */ @Data -public class AddWorkflowNodeRequest { + +public class SaveWorkflowNodeRequest { + + private Long id; private Long appId; private Long workflowId; + /** + * 节点类型 + */ + private int type = 1; /** * 任务 ID */ @@ -24,7 +31,7 @@ public class AddWorkflowNodeRequest { /** * 节点别名,默认为对应的任务名称 */ - private String nodeAlias; + private String nodeName; /** * 节点参数 */ @@ -42,5 +49,6 @@ public class AddWorkflowNodeRequest { CommonUtils.requireNonNull(this.appId, "appId can't be empty"); CommonUtils.requireNonNull(this.workflowId, "workflowId can't be empty"); CommonUtils.requireNonNull(this.jobId, "jobId can't be empty"); + CommonUtils.requireNonNull(this.type, "type can't be empty"); } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java index 782b90b1..02830925 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.common.request.http; import com.github.kfcfans.powerjob.common.TimeExpressionType; +import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.google.common.collect.Lists; import lombok.Data; @@ -59,6 +60,9 @@ public class SaveWorkflowRequest implements Serializable { */ private List notifyUserIds = Lists.newLinkedList(); + /** 点线表示法*/ + private PEWorkflowDAG dag; + public void valid() { CommonUtils.requireNonNull(wfName, "workflow name can't be empty"); CommonUtils.requireNonNull(appId, "appId can't be empty"); diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/NodeType.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/NodeType.java new file mode 100644 index 00000000..4a193889 --- /dev/null +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/NodeType.java @@ -0,0 +1,23 @@ +package tech.powerjob.server.common.constants; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 节点类型 + * + * @author Echo009 + * @since 2021/3/7 + */ +@Getter +@AllArgsConstructor +public enum NodeType { + /** + * 普通节点 + */ + COMMON(1); + + + private final int code; + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index 07bf5521..dffc5ac4 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -184,7 +184,7 @@ public class WorkflowInstanceManager { readyNodes.forEach(readyNode -> { // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示 // instanceParam 传递的是工作流实例的 wfContext - Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); + Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); readyNode.setInstanceId(instanceId); readyNode.setStatus(InstanceStatus.RUNNING.getV()); @@ -321,7 +321,7 @@ public class WorkflowInstanceManager { for (PEWorkflowDAG.Node readyNode : readyNodes) { // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 // instanceParam 传递的是工作流实例的 wfContext - Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); + Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); readyNode.setInstanceId(newInstanceId); readyNode.setStatus(InstanceStatus.RUNNING.getV()); log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId); @@ -394,18 +394,18 @@ public class WorkflowInstanceManager { // 默认启用 + 不允许失败跳过 node.setEnable(true) .setSkipWhenFailed(false) - .setJobParams(jobInfo.getJobParams()); + .setNodeParams(jobInfo.getJobParams()); } else { WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get(); // 使用节点别名覆盖 - node.setJobName(nodeInfo.getNodeAlias()) + node.setNodeName(nodeInfo.getNodeName()) .setEnable(nodeInfo.getEnable()) .setSkipWhenFailed(nodeInfo.getSkipWhenFailed()); // 如果节点中指定了参数信息,则取节点的,否则取 Job 上的 if (!StringUtils.isBlank(nodeInfo.getNodeParams())) { - node.setJobParams(nodeInfo.getNodeParams()); + node.setNodeParams(nodeInfo.getNodeParams()); } else { - node.setJobParams(jobInfo.getJobParams()); + node.setNodeParams(jobInfo.getJobParams()); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index bbc29c40..24695e86 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -4,9 +4,18 @@ import com.alibaba.fastjson.JSON; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; +import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowNodeRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; import tech.powerjob.server.common.utils.CronExpression; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.JobInfoDO; @@ -16,17 +25,6 @@ import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; -import com.github.kfcfans.powerjob.common.request.http.AddWorkflowNodeRequest; -import com.github.kfcfans.powerjob.common.request.http.ModifyWorkflowNodeRequest; -import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowDAGRequest; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; import javax.annotation.Resource; import javax.transaction.Transactional; @@ -61,8 +59,8 @@ public class WorkflowService { * * @param req 请求 * @return 工作流ID - * @exception ParseException 异常 */ + @Transactional(rollbackOn = Exception.class) public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException { req.valid(); @@ -93,11 +91,56 @@ public class WorkflowService { } else { wf.setTimeExpression(null); } + // 在当前的交互设计下首次创建一定不会有 DAG 信息 + if (req.getId() != null) { + wf.setPeDAG(validateDAGAndConvert2String(req)); + } WorkflowInfoDO newEntity = workflowInfoRepository.saveAndFlush(wf); + return newEntity.getId(); } + /** + * 保存 DAG 信息 + * 这里会物理删除游离的节点信息 + */ + private String validateDAGAndConvert2String(SaveWorkflowRequest req) { + if (req.getDag() == null || CollectionUtils.isEmpty(req.getDag().getNodes())) { + return "{}"; + } + PEWorkflowDAG dag = req.getDag(); + if (!WorkflowDAGUtils.valid(dag)) { + throw new PowerJobException("illegal DAG"); + } + // 注意:这里只会保存图相关的基础信息,nodeId,jobId,jobName(nodeAlias) + // 其中 jobId,jobName 均以节点中的信息为准 + List nodeIdList = Lists.newArrayList(); + HashMap nodeIdNodInfoMap = Maps.newHashMap(); + workflowNodeInfoRepository.findByWorkflowId(req.getId()).forEach( + e -> nodeIdNodInfoMap.put(e.getId(), e) + ); + for (PEWorkflowDAG.Node node : dag.getNodes()) { + WorkflowNodeInfoDO nodeInfo = nodeIdNodInfoMap.get(node.getNodeId()); + if (nodeInfo == null) { + throw new PowerJobException("can't find node info by id :" + node.getNodeId()); + } + if (!req.getId().equals(nodeInfo.getWorkflowId())) { + throw new PowerJobException("workflowId of current node must be same to workflowId"); + } + // 节点中的名称信息一定是非空的 + node.setNodeName(nodeInfo.getNodeName()).setJobId(nodeInfo.getJobId()); + // 清空其他信息 + node.setEnable(null).setSkipWhenFailed(null).setInstanceId(null).setResult(null); + nodeIdList.add(node.getNodeId()); + } + + int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(req.getId(), nodeIdList); + log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow", req.getId(), deleteCount); + return JSON.toJSONString(dag); + } + + /** * 深度复制工作流 * @@ -238,89 +281,39 @@ public class WorkflowService { return wfInstanceId; } + /** - * 添加工作流节点 + * 保存工作流节点(新增 或者 保存) * - * @param requestList 工作流节点列表 - * @return 新增的节点信息 + * @param workflowNodeRequestList 工作流节点 + * @return 更新 或者 创建后的工作流节点信息 */ - public List addWorkflowNode(List requestList) { - if (CollectionUtils.isEmpty(requestList)) { - throw new PowerJobException("requestList must be not null"); + public List saveWorkflowNode(List workflowNodeRequestList) { + if (CollectionUtils.isEmpty(workflowNodeRequestList)) { + return Collections.emptyList(); } - - List saveList = new ArrayList<>(); - for (AddWorkflowNodeRequest addWorkflowNodeRequest : requestList) { - // 校验 - addWorkflowNodeRequest.valid(); - permissionCheck(addWorkflowNodeRequest.getWorkflowId(), addWorkflowNodeRequest.getAppId()); - - WorkflowNodeInfoDO workflowNodeInfoDO = new WorkflowNodeInfoDO(); - BeanUtils.copyProperties(addWorkflowNodeRequest, workflowNodeInfoDO); + ArrayList res = new ArrayList<>(workflowNodeRequestList.size()); + for (SaveWorkflowNodeRequest req : workflowNodeRequestList) { + req.valid(); + permissionCheck(req.getWorkflowId(), req.getAppId()); + WorkflowNodeInfoDO workflowNodeInfo; + if (req.getId() != null) { + workflowNodeInfo = workflowNodeInfoRepository.findById(req.getId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + req.getId())); + } else { + workflowNodeInfo = new WorkflowNodeInfoDO(); + workflowNodeInfo.setGmtCreate(new Date()); + } + BeanUtils.copyProperties(req, workflowNodeInfo); // 如果名称为空则默认取任务名称 - if (StringUtils.isEmpty(addWorkflowNodeRequest.getNodeAlias())) { - JobInfoDO jobInfoDO = jobInfoRepository.findById(addWorkflowNodeRequest.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + addWorkflowNodeRequest.getJobId())); - workflowNodeInfoDO.setNodeAlias(jobInfoDO.getJobName()); + if (StringUtils.isEmpty(workflowNodeInfo.getNodeName())) { + JobInfoDO jobInfoDO = jobInfoRepository.findById(req.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + req.getJobId())); + workflowNodeInfo.setNodeName(jobInfoDO.getJobName()); } - Date current = new Date(); - workflowNodeInfoDO.setGmtCreate(current); - workflowNodeInfoDO.setGmtModified(current); - saveList.add(workflowNodeInfoDO); + workflowNodeInfo.setGmtModified(new Date()); + workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo); + res.add(workflowNodeInfo); } - return workflowNodeInfoRepository.saveAll(saveList); - } - - /** - * 修改工作流节点信息 - * - * @param req 工作流节点信息 - */ - public void modifyWorkflowNode(ModifyWorkflowNodeRequest req) { - req.valid(); - permissionCheck(req.getWorkflowId(), req.getAppId()); - WorkflowNodeInfoDO workflowNodeInfoDO = workflowNodeInfoRepository.findById(req.getId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + req.getId())); - BeanUtils.copyProperties(req, workflowNodeInfoDO); - workflowNodeInfoDO.setGmtModified(new Date()); - workflowNodeInfoRepository.saveAndFlush(workflowNodeInfoDO); - } - - /** - * 保存 DAG 信息 - * 这里会物理删除游离的节点信息 - */ - @Transactional(rollbackOn = Exception.class) - public void saveWorkflowDAG(SaveWorkflowDAGRequest req) { - req.valid(); - Long workflowId = req.getId(); - WorkflowInfoDO workflowInfoDO = permissionCheck(workflowId, req.getAppId()); - PEWorkflowDAG dag = req.getDag(); - if (!WorkflowDAGUtils.valid(dag)) { - throw new PowerJobException("illegal DAG"); - } - // 注意:这里只会保存图相关的基础信息,nodeId,jobId,jobName(nodeAlias) - // 其中 jobId,jobName 均以节点中的信息为准 - List nodeIdList = Lists.newArrayList(); - HashMap nodeIdNodInfoMap = Maps.newHashMap(); - workflowNodeInfoRepository.findByWorkflowId(workflowId).forEach( - e -> nodeIdNodInfoMap.put(e.getId(), e) - ); - for (PEWorkflowDAG.Node node : dag.getNodes()) { - WorkflowNodeInfoDO nodeInfo = nodeIdNodInfoMap.get(node.getNodeId()); - if (nodeInfo == null) { - throw new PowerJobException("can't find node info by id :" + node.getNodeId()); - } - if (!workflowInfoDO.getId().equals(nodeInfo.getWorkflowId())) { - throw new PowerJobException("node workflowId must be same to workflowId"); - } - // 节点中的别名一定是非空的 - node.setJobName(nodeInfo.getNodeAlias()).setJobId(nodeInfo.getJobId()); - nodeIdList.add(node.getNodeId()); - } - workflowInfoDO.setPeDAG(JSON.toJSONString(dag)); - workflowInfoRepository.saveAndFlush(workflowInfoDO); - // 物理删除当前工作流中 “游离” 的节点 (不在工作流 DAG 中的节点) - int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(workflowId, nodeIdList); - log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow", workflowId, deleteCount); + return res; } @@ -349,8 +342,8 @@ public class WorkflowService { if (nodeInfo != null) { node.setEnable(nodeInfo.getEnable()) .setSkipWhenFailed(nodeInfo.getSkipWhenFailed()) - .setJobName(nodeInfo.getNodeAlias()) - .setJobParams(nodeInfo.getNodeParams()); + .setNodeName(nodeInfo.getNodeName()) + .setNodeParams(nodeInfo.getNodeParams()); } else { // 默认开启 并且 不允许失败跳过 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java index c836ef02..9578ae0a 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/algorithm/WorkflowDAGUtils.java @@ -212,7 +212,7 @@ public class WorkflowDAGUtils { // 创建节点 peWorkflowDAG.getNodes().forEach(node -> { Long nodeId = node.getNodeId(); - WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV()); + WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getNodeName(), InstanceStatus.WAITING_DISPATCH.getV()); id2Node.put(nodeId, n); // 初始阶段,每一个点都设为顶点 diff --git a/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java b/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java index e7dbab62..908f5313 100644 --- a/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java +++ b/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java @@ -224,7 +224,7 @@ public class V3ToV4MigrateService { nodeInfo.setAppId(workflowInfo.getAppId()); nodeInfo.setJobId(jobInfo.getId()); // 默认启用,不允许失败跳过,参数和 Job 保持一致 - nodeInfo.setNodeAlias(jobInfo.getJobName()); + nodeInfo.setNodeName(jobInfo.getJobName()); nodeInfo.setNodeParams(jobInfo.getJobParams()); nodeInfo.setEnable(true); nodeInfo.setSkipWhenFailed(false); @@ -234,7 +234,7 @@ public class V3ToV4MigrateService { nodeInfo = workflowNodeInfoRepository.saveAndFlush(nodeInfo); // 更新节点 ID node.setNodeId(nodeInfo.getId()); - node.setJobName(nodeInfo.getNodeAlias()); + node.setNodeName(nodeInfo.getNodeName()); jobId2NodeIdMap.put(node.getJobId(), node.getNodeId()); } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java index 23d20b97..1005e822 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowNodeInfoDO.java @@ -5,6 +5,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; +import tech.powerjob.server.common.constants.NodeType; import javax.persistence.*; import java.util.Date; @@ -33,16 +34,19 @@ public class WorkflowNodeInfoDO { @Column(nullable = false) private Long workflowId; + /** + * 节点类型 {@link NodeType} + */ + private Integer type; /** * 任务 ID */ @Column(nullable = false) private Long jobId; /** - * 节点别名,默认为对应的任务名称 + * 节点名称,默认为对应的任务名称 */ - @Column(nullable = false) - private String nodeAlias; + private String nodeName; /** * 节点参数 */ @@ -58,6 +62,9 @@ public class WorkflowNodeInfoDO { */ @Column(nullable = false) private Boolean skipWhenFailed; + + @Lob + private String extra; /** * 创建时间 */ diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java index 6b258609..5995016d 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java @@ -193,21 +193,9 @@ public class OpenAPIController { return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay)); } - @PostMapping(OpenAPIConstant.ADD_WORKFLOW_NODE) - public ResultDTO> addWorkflowNode(@RequestBody List request) { - return ResultDTO.success(workflowService.addWorkflowNode(request)); - } - - @PostMapping(OpenAPIConstant.SAVE_WORKFLOW_DAG) - public ResultDTO saveWorkflowDAG(@RequestBody SaveWorkflowDAGRequest request) { - workflowService.saveWorkflowDAG(request); - return ResultDTO.success(null); - } - - @PostMapping(OpenAPIConstant.MODIFY_WORKFLOW_NODE) - public ResultDTO modifyWorkflowNode(@RequestBody ModifyWorkflowNodeRequest request) { - workflowService.modifyWorkflowNode(request); - return ResultDTO.success(null); + @PostMapping(OpenAPIConstant.SAVE_WORKFLOW_NODE) + public ResultDTO> saveWorkflowNode(@RequestBody List request) { + return ResultDTO.success(workflowService.saveWorkflowNode(request)); } /* ************* Workflow Instance 区 ************* */ diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/WorkflowController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/WorkflowController.java index 6071bbae..a8f8bad0 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/WorkflowController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/WorkflowController.java @@ -1,8 +1,6 @@ package tech.powerjob.server.web.controller; -import com.github.kfcfans.powerjob.common.request.http.AddWorkflowNodeRequest; -import com.github.kfcfans.powerjob.common.request.http.ModifyWorkflowNodeRequest; -import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowDAGRequest; +import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowNodeRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.ResultDTO; import tech.powerjob.server.common.constants.SwitchableStatus; @@ -100,22 +98,11 @@ public class WorkflowController { return ResultDTO.success(WorkflowInfoVO.from(workflowInfoDO)); } - @PostMapping("/addNode") - public ResultDTO> addWorkflowNode(@RequestBody List request) { - return ResultDTO.success(workflowService.addWorkflowNode(request)); + @PostMapping("/saveNode") + public ResultDTO> addWorkflowNode(@RequestBody List request) { + return ResultDTO.success(workflowService.saveWorkflowNode(request)); } - @PostMapping("/saveDAG") - public ResultDTO saveWorkflowDAG(@RequestBody SaveWorkflowDAGRequest request) { - workflowService.saveWorkflowDAG(request); - return ResultDTO.success(null); - } - - @PostMapping("/modifyNode") - public ResultDTO modifyWorkflowNode(@RequestBody ModifyWorkflowNodeRequest request) { - workflowService.modifyWorkflowNode(request); - return ResultDTO.success(null); - } private static PageResult convertPage(Page originPage) {