refactor: optimize API of workflow

This commit is contained in:
Echo009 2021-03-08 01:07:56 +08:00
parent 59d4df1422
commit 38929bff6d
16 changed files with 200 additions and 302 deletions

View File

@ -360,49 +360,22 @@ public class OhMyClient {
} }
/**
* 保存工作流 DAG
*
* @param request DAG of Workflow
* @return Standard return object
*/
public ResultDTO<Void> saveWorkflowDag(SaveWorkflowDAGRequest request) {
request.setAppId(appId);
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_DAG, RequestBody.create(jsonType, json));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
/** /**
* 添加工作流节点 * 添加工作流节点
* *
* @param requestList Node info list of Workflow * @param requestList Node info list of Workflow
* @return Standard return object * @return Standard return object
*/ */
public ResultDTO<List<WorkflowNodeInfoDTO>> addWorkflowNode(List<AddWorkflowNodeRequest> requestList) { public ResultDTO<List<WorkflowNodeInfoDTO>> saveWorkflowNode(List<SaveWorkflowNodeRequest> requestList) {
for (AddWorkflowNodeRequest addWorkflowNodeRequest : requestList) { for (SaveWorkflowNodeRequest saveWorkflowNodeRequest : requestList) {
addWorkflowNodeRequest.setAppId(appId); saveWorkflowNodeRequest.setAppId(appId);
} }
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JsonUtils.toJSONStringUnsafe(requestList); String json = JsonUtils.toJSONStringUnsafe(requestList);
String post = postHA(OpenAPIConstant.ADD_WORKFLOW_NODE, RequestBody.create(jsonType, json)); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_NODE, RequestBody.create(jsonType, json));
return JSON.parseObject(post, WF_NODE_LIST_RESULT_TYPE); return JSON.parseObject(post, WF_NODE_LIST_RESULT_TYPE);
} }
/**
* 修改工作流节点
*
* @param request Node info of Workflow
* @return Standard return object
*/
public ResultDTO<Void> modifyWorkflowNode(ModifyWorkflowNodeRequest request) {
request.setAppId(appId);
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.MODIFY_WORKFLOW_NODE, RequestBody.create(jsonType, json));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
/** /**

View File

@ -62,6 +62,50 @@ class TestWorkflow extends ClientInitializer {
System.out.println(res); System.out.println(res);
Assertions.assertNotNull(res); Assertions.assertNotNull(res);
req.setId(res.getData());
// 创建节点
SaveWorkflowNodeRequest saveWorkflowNodeRequest1 = new SaveWorkflowNodeRequest();
saveWorkflowNodeRequest1.setJobId(1L);
saveWorkflowNodeRequest1.setWorkflowId(req.getId());
saveWorkflowNodeRequest1.setNodeName("DAG-Node-1");
SaveWorkflowNodeRequest saveWorkflowNodeRequest2 = new SaveWorkflowNodeRequest();
saveWorkflowNodeRequest2.setJobId(1L);
saveWorkflowNodeRequest2.setWorkflowId(req.getId());
saveWorkflowNodeRequest2.setNodeName("DAG-Node-2");
SaveWorkflowNodeRequest saveWorkflowNodeRequest3 = new SaveWorkflowNodeRequest();
saveWorkflowNodeRequest3.setJobId(1L);
saveWorkflowNodeRequest3.setWorkflowId(req.getId());
saveWorkflowNodeRequest3.setNodeName("DAG-Node-3");
List<WorkflowNodeInfoDTO> nodeList = ohMyClient.saveWorkflowNode(Lists.newArrayList(saveWorkflowNodeRequest1,saveWorkflowNodeRequest2,saveWorkflowNodeRequest3)).getData();
System.out.println(nodeList);
Assertions.assertNotNull(nodeList);
// DAG
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId(), 1L, "DAG-Node-1"));
nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId(), 1L, "DAG-Node-2"));
nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId(), 1L, "DAG-Node-3"));
edges.add(new PEWorkflowDAG.Edge(nodeList.get(0).getId(), nodeList.get(1).getId()));
edges.add(new PEWorkflowDAG.Edge(nodeList.get(1).getId(), nodeList.get(2).getId()));
PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, edges);
// 保存完整信息
req.setDag(peWorkflowDAG);
res = ohMyClient.saveWorkflow(req);
System.out.println(res);
Assertions.assertNotNull(res);
} }
@Test @Test
@ -71,52 +115,6 @@ class TestWorkflow extends ClientInitializer {
Assertions.assertNotNull(res); Assertions.assertNotNull(res);
} }
@Test
void testAddWorkflowNode() {
AddWorkflowNodeRequest addWorkflowNodeRequest = new AddWorkflowNodeRequest();
addWorkflowNodeRequest.setJobId(1L);
addWorkflowNodeRequest.setWorkflowId(WF_ID);
ResultDTO<List<WorkflowNodeInfoDTO>> res = ohMyClient.addWorkflowNode(Lists.newArrayList(addWorkflowNodeRequest));
System.out.println(res);
Assertions.assertNotNull(res);
}
@Test
void testModifyWorkflowNode() {
ModifyWorkflowNodeRequest modifyWorkflowNodeRequest = new ModifyWorkflowNodeRequest();
modifyWorkflowNodeRequest.setWorkflowId(WF_ID);
modifyWorkflowNodeRequest.setId(1L);
modifyWorkflowNodeRequest.setNodeAlias("(๑•̀ㅂ•́)و✧");
modifyWorkflowNodeRequest.setEnable(false);
modifyWorkflowNodeRequest.setSkipWhenFailed(false);
ResultDTO<Void> res = ohMyClient.modifyWorkflowNode(modifyWorkflowNodeRequest);
System.out.println(res);
Assertions.assertNotNull(res);
}
@Test
void testSaveWorkflowDag() {
// DAG
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "DAG-Node-1"));
nodes.add(new PEWorkflowDAG.Node(2L, 1L, "DAG-Node-2"));
nodes.add(new PEWorkflowDAG.Node(3L, 1L, "DAG-Node-3"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
edges.add(new PEWorkflowDAG.Edge(2L, 3L));
PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, edges);
SaveWorkflowDAGRequest saveWorkflowDAGRequest = new SaveWorkflowDAGRequest();
saveWorkflowDAGRequest.setId(WF_ID);
saveWorkflowDAGRequest.setDag(peWorkflowDAG);
ResultDTO<Void> res = ohMyClient.saveWorkflowDag(saveWorkflowDAGRequest);
System.out.println(res);
Assertions.assertNotNull(res);
}
@Test @Test
void testDisableWorkflow() { void testDisableWorkflow() {

View File

@ -46,9 +46,7 @@ public class OpenAPIConstant {
public static final String ENABLE_WORKFLOW = "/enableWorkflow"; public static final String ENABLE_WORKFLOW = "/enableWorkflow";
public static final String DELETE_WORKFLOW = "/deleteWorkflow"; public static final String DELETE_WORKFLOW = "/deleteWorkflow";
public static final String RUN_WORKFLOW = "/runWorkflow"; public static final String RUN_WORKFLOW = "/runWorkflow";
public static final String ADD_WORKFLOW_NODE = "/addWorkflowNode"; public static final String SAVE_WORKFLOW_NODE = "/addWorkflowNode";
public static final String MODIFY_WORKFLOW_NODE = "/modifyWorkflowNode";
public static final String SAVE_WORKFLOW_DAG = "/saveWorkflowDAG";
/* ************* WorkflowInstance 区 ************* */ /* ************* WorkflowInstance 区 ************* */

View File

@ -48,16 +48,16 @@ public class PEWorkflowDAG implements Serializable {
private Long jobId; private Long jobId;
/** /**
* job alias or job name * node name
*/ */
private String jobName; private String nodeName;
/* Instance running param, which is not required by DAG. */ /* Instance running param, which is not required by DAG. */
@JsonSerialize(using= ToStringSerializer.class) @JsonSerialize(using= ToStringSerializer.class)
private Long instanceId; private Long instanceId;
private String jobParams; private String nodeParams;
private Integer status; private Integer status;
@ -70,10 +70,10 @@ public class PEWorkflowDAG implements Serializable {
private Boolean skipWhenFailed; private Boolean skipWhenFailed;
public Node(Long nodeId,Long jobId, String jobName) { public Node(Long nodeId,Long jobId, String nodeName) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.jobId = jobId; this.jobId = jobId;
this.jobName = jobName; this.nodeName = nodeName;
} }
} }

View File

@ -1,46 +0,0 @@
package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
/**
* 新增工作流节点信息请求
*
* @author zenggonggu
* @since 2021/02/02
*/
@Data
public class ModifyWorkflowNodeRequest {
private Long id;
private Long appId;
private Long workflowId;
/**
* 节点别名默认为对应的任务名称
*/
private String nodeAlias;
/**
* 节点参数
*/
private String nodeParams;
/**
* 是否启用
*/
private Boolean enable;
/**
* 是否允许失败跳过
*/
private Boolean skipWhenFailed;
public void valid(){
CommonUtils.requireNonNull(this.id, "id can't be empty");
CommonUtils.requireNonNull(this.appId, "appId can't be empty");
CommonUtils.requireNonNull(this.nodeAlias, "nodeAlias can't be empty");
CommonUtils.requireNonNull(this.enable, "enable can't be empty");
CommonUtils.requireNonNull(this.skipWhenFailed, "skipWhenFailed can't be empty");
}
}

View File

@ -1,35 +0,0 @@
package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
/**
* 新增工作流节点信息请求
*
* @author zenggonggu
* @since 2021/02/02
*/
@Data
public class SaveWorkflowDAGRequest {
/** workflowId **/
private Long id;
/** 所属应用IDOpenClient不需要用户填写自动填充*/
private Long appId;
/** 点线表示法*/
private PEWorkflowDAG dag;
public void valid() {
CommonUtils.requireNonNull(this.appId, "appId can't be empty");
CommonUtils.requireNonNull(this.id, "workflowId can't be empty");
CommonUtils.requireNonNull(dag, "dag can't be empty");
CommonUtils.requireNonNull(dag.getNodes(), "nodes can't be empty");
CommonUtils.requireNonNull(dag.getEdges(), "edges can't be empty");
for (PEWorkflowDAG.Node node : dag.getNodes()) {
// 清空其他信息
node.setEnable(null).setSkipWhenFailed(null).setInstanceId(null).setJobName(null).setResult(null);
}
}
}

View File

@ -12,11 +12,18 @@ import lombok.Data;
* @since 2021/02/02 * @since 2021/02/02
*/ */
@Data @Data
public class AddWorkflowNodeRequest {
public class SaveWorkflowNodeRequest {
private Long id;
private Long appId; private Long appId;
private Long workflowId; private Long workflowId;
/**
* 节点类型
*/
private int type = 1;
/** /**
* 任务 ID * 任务 ID
*/ */
@ -24,7 +31,7 @@ public class AddWorkflowNodeRequest {
/** /**
* 节点别名默认为对应的任务名称 * 节点别名默认为对应的任务名称
*/ */
private String nodeAlias; private String nodeName;
/** /**
* 节点参数 * 节点参数
*/ */
@ -42,5 +49,6 @@ public class AddWorkflowNodeRequest {
CommonUtils.requireNonNull(this.appId, "appId can't be empty"); CommonUtils.requireNonNull(this.appId, "appId can't be empty");
CommonUtils.requireNonNull(this.workflowId, "workflowId can't be empty"); CommonUtils.requireNonNull(this.workflowId, "workflowId can't be empty");
CommonUtils.requireNonNull(this.jobId, "jobId can't be empty"); CommonUtils.requireNonNull(this.jobId, "jobId can't be empty");
CommonUtils.requireNonNull(this.type, "type can't be empty");
} }
} }

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.common.request.http; package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.Data; import lombok.Data;
@ -59,6 +60,9 @@ public class SaveWorkflowRequest implements Serializable {
*/ */
private List<Long> notifyUserIds = Lists.newLinkedList(); private List<Long> notifyUserIds = Lists.newLinkedList();
/** 点线表示法*/
private PEWorkflowDAG dag;
public void valid() { public void valid() {
CommonUtils.requireNonNull(wfName, "workflow name can't be empty"); CommonUtils.requireNonNull(wfName, "workflow name can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty"); CommonUtils.requireNonNull(appId, "appId can't be empty");

View File

@ -0,0 +1,23 @@
package tech.powerjob.server.common.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 节点类型
*
* @author Echo009
* @since 2021/3/7
*/
@Getter
@AllArgsConstructor
public enum NodeType {
/**
* 普通节点
*/
COMMON(1);
private final int code;
}

View File

@ -184,7 +184,7 @@ public class WorkflowInstanceManager {
readyNodes.forEach(readyNode -> { readyNodes.forEach(readyNode -> {
// 注意这里必须保证任务实例全部创建成功如果在这里创建实例部分失败会导致 DAG 信息不会更新已经生成的实例节点在工作流日志中没法展示 // 注意这里必须保证任务实例全部创建成功如果在这里创建实例部分失败会导致 DAG 信息不会更新已经生成的实例节点在工作流日志中没法展示
// instanceParam 传递的是工作流实例的 wfContext // instanceParam 传递的是工作流实例的 wfContext
Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); Long instanceId = instanceService.create(readyNode.getJobId(), wfInfo.getAppId(), readyNode.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
readyNode.setInstanceId(instanceId); readyNode.setInstanceId(instanceId);
readyNode.setStatus(InstanceStatus.RUNNING.getV()); readyNode.setStatus(InstanceStatus.RUNNING.getV());
@ -321,7 +321,7 @@ public class WorkflowInstanceManager {
for (PEWorkflowDAG.Node readyNode : readyNodes) { for (PEWorkflowDAG.Node readyNode : readyNodes) {
// 同理这里必须保证任务实例全部创建成功避免部分失败导致已经生成的实例节点在工作流日志中没法展示 // 同理这里必须保证任务实例全部创建成功避免部分失败导致已经生成的实例节点在工作流日志中没法展示
// instanceParam 传递的是工作流实例的 wfContext // instanceParam 传递的是工作流实例的 wfContext
Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
readyNode.setInstanceId(newInstanceId); readyNode.setInstanceId(newInstanceId);
readyNode.setStatus(InstanceStatus.RUNNING.getV()); readyNode.setStatus(InstanceStatus.RUNNING.getV());
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId); log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId);
@ -394,18 +394,18 @@ public class WorkflowInstanceManager {
// 默认启用 + 不允许失败跳过 // 默认启用 + 不允许失败跳过
node.setEnable(true) node.setEnable(true)
.setSkipWhenFailed(false) .setSkipWhenFailed(false)
.setJobParams(jobInfo.getJobParams()); .setNodeParams(jobInfo.getJobParams());
} else { } else {
WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get(); WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get();
// 使用节点别名覆盖 // 使用节点别名覆盖
node.setJobName(nodeInfo.getNodeAlias()) node.setNodeName(nodeInfo.getNodeName())
.setEnable(nodeInfo.getEnable()) .setEnable(nodeInfo.getEnable())
.setSkipWhenFailed(nodeInfo.getSkipWhenFailed()); .setSkipWhenFailed(nodeInfo.getSkipWhenFailed());
// 如果节点中指定了参数信息则取节点的否则取 Job 上的 // 如果节点中指定了参数信息则取节点的否则取 Job 上的
if (!StringUtils.isBlank(nodeInfo.getNodeParams())) { if (!StringUtils.isBlank(nodeInfo.getNodeParams())) {
node.setJobParams(nodeInfo.getNodeParams()); node.setNodeParams(nodeInfo.getNodeParams());
} else { } else {
node.setJobParams(jobInfo.getJobParams()); node.setNodeParams(jobInfo.getJobParams());
} }
} }

View File

@ -4,9 +4,18 @@ import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowNodeRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.common.utils.CronExpression; import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
@ -16,17 +25,6 @@ import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import com.github.kfcfans.powerjob.common.request.http.AddWorkflowNodeRequest;
import com.github.kfcfans.powerjob.common.request.http.ModifyWorkflowNodeRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowDAGRequest;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.transaction.Transactional; import javax.transaction.Transactional;
@ -61,8 +59,8 @@ public class WorkflowService {
* *
* @param req 请求 * @param req 请求
* @return 工作流ID * @return 工作流ID
* @exception ParseException 异常
*/ */
@Transactional(rollbackOn = Exception.class)
public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException { public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException {
req.valid(); req.valid();
@ -93,11 +91,56 @@ public class WorkflowService {
} else { } else {
wf.setTimeExpression(null); wf.setTimeExpression(null);
} }
// 在当前的交互设计下首次创建一定不会有 DAG 信息
if (req.getId() != null) {
wf.setPeDAG(validateDAGAndConvert2String(req));
}
WorkflowInfoDO newEntity = workflowInfoRepository.saveAndFlush(wf); WorkflowInfoDO newEntity = workflowInfoRepository.saveAndFlush(wf);
return newEntity.getId(); return newEntity.getId();
} }
/**
* 保存 DAG 信息
* 这里会物理删除游离的节点信息
*/
private String validateDAGAndConvert2String(SaveWorkflowRequest req) {
if (req.getDag() == null || CollectionUtils.isEmpty(req.getDag().getNodes())) {
return "{}";
}
PEWorkflowDAG dag = req.getDag();
if (!WorkflowDAGUtils.valid(dag)) {
throw new PowerJobException("illegal DAG");
}
// 注意这里只会保存图相关的基础信息nodeId,jobId,jobName(nodeAlias)
// 其中 jobIdjobName 均以节点中的信息为准
List<Long> nodeIdList = Lists.newArrayList();
HashMap<Long, WorkflowNodeInfoDO> nodeIdNodInfoMap = Maps.newHashMap();
workflowNodeInfoRepository.findByWorkflowId(req.getId()).forEach(
e -> nodeIdNodInfoMap.put(e.getId(), e)
);
for (PEWorkflowDAG.Node node : dag.getNodes()) {
WorkflowNodeInfoDO nodeInfo = nodeIdNodInfoMap.get(node.getNodeId());
if (nodeInfo == null) {
throw new PowerJobException("can't find node info by id :" + node.getNodeId());
}
if (!req.getId().equals(nodeInfo.getWorkflowId())) {
throw new PowerJobException("workflowId of current node must be same to workflowId");
}
// 节点中的名称信息一定是非空的
node.setNodeName(nodeInfo.getNodeName()).setJobId(nodeInfo.getJobId());
// 清空其他信息
node.setEnable(null).setSkipWhenFailed(null).setInstanceId(null).setResult(null);
nodeIdList.add(node.getNodeId());
}
int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(req.getId(), nodeIdList);
log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow", req.getId(), deleteCount);
return JSON.toJSONString(dag);
}
/** /**
* 深度复制工作流 * 深度复制工作流
* *
@ -238,89 +281,39 @@ public class WorkflowService {
return wfInstanceId; return wfInstanceId;
} }
/** /**
* 添加工作流节点 * 保存工作流节点新增 或者 保存
* *
* @param requestList 工作流节点列表 * @param workflowNodeRequestList 工作流节点
* @return 新增的节点信息 * @return 更新 或者 创建后的工作流节点信息
*/ */
public List<WorkflowNodeInfoDO> addWorkflowNode(List<AddWorkflowNodeRequest> requestList) { public List<WorkflowNodeInfoDO> saveWorkflowNode(List<SaveWorkflowNodeRequest> workflowNodeRequestList) {
if (CollectionUtils.isEmpty(requestList)) { if (CollectionUtils.isEmpty(workflowNodeRequestList)) {
throw new PowerJobException("requestList must be not null"); return Collections.emptyList();
} }
ArrayList<WorkflowNodeInfoDO> res = new ArrayList<>(workflowNodeRequestList.size());
List<WorkflowNodeInfoDO> saveList = new ArrayList<>(); for (SaveWorkflowNodeRequest req : workflowNodeRequestList) {
for (AddWorkflowNodeRequest addWorkflowNodeRequest : requestList) { req.valid();
// 校验 permissionCheck(req.getWorkflowId(), req.getAppId());
addWorkflowNodeRequest.valid(); WorkflowNodeInfoDO workflowNodeInfo;
permissionCheck(addWorkflowNodeRequest.getWorkflowId(), addWorkflowNodeRequest.getAppId()); if (req.getId() != null) {
workflowNodeInfo = workflowNodeInfoRepository.findById(req.getId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + req.getId()));
WorkflowNodeInfoDO workflowNodeInfoDO = new WorkflowNodeInfoDO(); } else {
BeanUtils.copyProperties(addWorkflowNodeRequest, workflowNodeInfoDO); workflowNodeInfo = new WorkflowNodeInfoDO();
workflowNodeInfo.setGmtCreate(new Date());
}
BeanUtils.copyProperties(req, workflowNodeInfo);
// 如果名称为空则默认取任务名称 // 如果名称为空则默认取任务名称
if (StringUtils.isEmpty(addWorkflowNodeRequest.getNodeAlias())) { if (StringUtils.isEmpty(workflowNodeInfo.getNodeName())) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(addWorkflowNodeRequest.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + addWorkflowNodeRequest.getJobId())); JobInfoDO jobInfoDO = jobInfoRepository.findById(req.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + req.getJobId()));
workflowNodeInfoDO.setNodeAlias(jobInfoDO.getJobName()); workflowNodeInfo.setNodeName(jobInfoDO.getJobName());
} }
Date current = new Date(); workflowNodeInfo.setGmtModified(new Date());
workflowNodeInfoDO.setGmtCreate(current); workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo);
workflowNodeInfoDO.setGmtModified(current); res.add(workflowNodeInfo);
saveList.add(workflowNodeInfoDO);
} }
return workflowNodeInfoRepository.saveAll(saveList); return res;
}
/**
* 修改工作流节点信息
*
* @param req 工作流节点信息
*/
public void modifyWorkflowNode(ModifyWorkflowNodeRequest req) {
req.valid();
permissionCheck(req.getWorkflowId(), req.getAppId());
WorkflowNodeInfoDO workflowNodeInfoDO = workflowNodeInfoRepository.findById(req.getId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + req.getId()));
BeanUtils.copyProperties(req, workflowNodeInfoDO);
workflowNodeInfoDO.setGmtModified(new Date());
workflowNodeInfoRepository.saveAndFlush(workflowNodeInfoDO);
}
/**
* 保存 DAG 信息
* 这里会物理删除游离的节点信息
*/
@Transactional(rollbackOn = Exception.class)
public void saveWorkflowDAG(SaveWorkflowDAGRequest req) {
req.valid();
Long workflowId = req.getId();
WorkflowInfoDO workflowInfoDO = permissionCheck(workflowId, req.getAppId());
PEWorkflowDAG dag = req.getDag();
if (!WorkflowDAGUtils.valid(dag)) {
throw new PowerJobException("illegal DAG");
}
// 注意这里只会保存图相关的基础信息nodeId,jobId,jobName(nodeAlias)
// 其中 jobIdjobName 均以节点中的信息为准
List<Long> nodeIdList = Lists.newArrayList();
HashMap<Long, WorkflowNodeInfoDO> nodeIdNodInfoMap = Maps.newHashMap();
workflowNodeInfoRepository.findByWorkflowId(workflowId).forEach(
e -> nodeIdNodInfoMap.put(e.getId(), e)
);
for (PEWorkflowDAG.Node node : dag.getNodes()) {
WorkflowNodeInfoDO nodeInfo = nodeIdNodInfoMap.get(node.getNodeId());
if (nodeInfo == null) {
throw new PowerJobException("can't find node info by id :" + node.getNodeId());
}
if (!workflowInfoDO.getId().equals(nodeInfo.getWorkflowId())) {
throw new PowerJobException("node workflowId must be same to workflowId");
}
// 节点中的别名一定是非空的
node.setJobName(nodeInfo.getNodeAlias()).setJobId(nodeInfo.getJobId());
nodeIdList.add(node.getNodeId());
}
workflowInfoDO.setPeDAG(JSON.toJSONString(dag));
workflowInfoRepository.saveAndFlush(workflowInfoDO);
// 物理删除当前工作流中 游离 的节点 (不在工作流 DAG 中的节点)
int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(workflowId, nodeIdList);
log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow", workflowId, deleteCount);
} }
@ -349,8 +342,8 @@ public class WorkflowService {
if (nodeInfo != null) { if (nodeInfo != null) {
node.setEnable(nodeInfo.getEnable()) node.setEnable(nodeInfo.getEnable())
.setSkipWhenFailed(nodeInfo.getSkipWhenFailed()) .setSkipWhenFailed(nodeInfo.getSkipWhenFailed())
.setJobName(nodeInfo.getNodeAlias()) .setNodeName(nodeInfo.getNodeName())
.setJobParams(nodeInfo.getNodeParams()); .setNodeParams(nodeInfo.getNodeParams());
} else { } else {
// 默认开启 并且 不允许失败跳过 // 默认开启 并且 不允许失败跳过

View File

@ -212,7 +212,7 @@ public class WorkflowDAGUtils {
// 创建节点 // 创建节点
peWorkflowDAG.getNodes().forEach(node -> { peWorkflowDAG.getNodes().forEach(node -> {
Long nodeId = node.getNodeId(); Long nodeId = node.getNodeId();
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV()); WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getNodeName(), InstanceStatus.WAITING_DISPATCH.getV());
id2Node.put(nodeId, n); id2Node.put(nodeId, n);
// 初始阶段每一个点都设为顶点 // 初始阶段每一个点都设为顶点

View File

@ -224,7 +224,7 @@ public class V3ToV4MigrateService {
nodeInfo.setAppId(workflowInfo.getAppId()); nodeInfo.setAppId(workflowInfo.getAppId());
nodeInfo.setJobId(jobInfo.getId()); nodeInfo.setJobId(jobInfo.getId());
// 默认启用不允许失败跳过参数和 Job 保持一致 // 默认启用不允许失败跳过参数和 Job 保持一致
nodeInfo.setNodeAlias(jobInfo.getJobName()); nodeInfo.setNodeName(jobInfo.getJobName());
nodeInfo.setNodeParams(jobInfo.getJobParams()); nodeInfo.setNodeParams(jobInfo.getJobParams());
nodeInfo.setEnable(true); nodeInfo.setEnable(true);
nodeInfo.setSkipWhenFailed(false); nodeInfo.setSkipWhenFailed(false);
@ -234,7 +234,7 @@ public class V3ToV4MigrateService {
nodeInfo = workflowNodeInfoRepository.saveAndFlush(nodeInfo); nodeInfo = workflowNodeInfoRepository.saveAndFlush(nodeInfo);
// 更新节点 ID // 更新节点 ID
node.setNodeId(nodeInfo.getId()); node.setNodeId(nodeInfo.getId());
node.setJobName(nodeInfo.getNodeAlias()); node.setNodeName(nodeInfo.getNodeName());
jobId2NodeIdMap.put(node.getJobId(), node.getNodeId()); jobId2NodeIdMap.put(node.getJobId(), node.getNodeId());
} }

View File

@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator; import org.hibernate.annotations.GenericGenerator;
import tech.powerjob.server.common.constants.NodeType;
import javax.persistence.*; import javax.persistence.*;
import java.util.Date; import java.util.Date;
@ -33,16 +34,19 @@ public class WorkflowNodeInfoDO {
@Column(nullable = false) @Column(nullable = false)
private Long workflowId; private Long workflowId;
/**
* 节点类型 {@link NodeType}
*/
private Integer type;
/** /**
* 任务 ID * 任务 ID
*/ */
@Column(nullable = false) @Column(nullable = false)
private Long jobId; private Long jobId;
/** /**
* 节点默认为对应的任务名称 * 节点默认为对应的任务名称
*/ */
@Column(nullable = false) private String nodeName;
private String nodeAlias;
/** /**
* 节点参数 * 节点参数
*/ */
@ -58,6 +62,9 @@ public class WorkflowNodeInfoDO {
*/ */
@Column(nullable = false) @Column(nullable = false)
private Boolean skipWhenFailed; private Boolean skipWhenFailed;
@Lob
private String extra;
/** /**
* 创建时间 * 创建时间
*/ */

View File

@ -193,21 +193,9 @@ public class OpenAPIController {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay)); return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay));
} }
@PostMapping(OpenAPIConstant.ADD_WORKFLOW_NODE) @PostMapping(OpenAPIConstant.SAVE_WORKFLOW_NODE)
public ResultDTO<List<WorkflowNodeInfoDO>> addWorkflowNode(@RequestBody List<AddWorkflowNodeRequest> request) { public ResultDTO<List<WorkflowNodeInfoDO>> saveWorkflowNode(@RequestBody List<SaveWorkflowNodeRequest> request) {
return ResultDTO.success(workflowService.addWorkflowNode(request)); return ResultDTO.success(workflowService.saveWorkflowNode(request));
}
@PostMapping(OpenAPIConstant.SAVE_WORKFLOW_DAG)
public ResultDTO<Void> saveWorkflowDAG(@RequestBody SaveWorkflowDAGRequest request) {
workflowService.saveWorkflowDAG(request);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.MODIFY_WORKFLOW_NODE)
public ResultDTO<Void> modifyWorkflowNode(@RequestBody ModifyWorkflowNodeRequest request) {
workflowService.modifyWorkflowNode(request);
return ResultDTO.success(null);
} }
/* ************* Workflow Instance 区 ************* */ /* ************* Workflow Instance 区 ************* */

View File

@ -1,8 +1,6 @@
package tech.powerjob.server.web.controller; package tech.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.request.http.AddWorkflowNodeRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowNodeRequest;
import com.github.kfcfans.powerjob.common.request.http.ModifyWorkflowNodeRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowDAGRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO;
import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.constants.SwitchableStatus;
@ -100,22 +98,11 @@ public class WorkflowController {
return ResultDTO.success(WorkflowInfoVO.from(workflowInfoDO)); return ResultDTO.success(WorkflowInfoVO.from(workflowInfoDO));
} }
@PostMapping("/addNode") @PostMapping("/saveNode")
public ResultDTO<List<WorkflowNodeInfoDO>> addWorkflowNode(@RequestBody List<AddWorkflowNodeRequest> request) { public ResultDTO<List<WorkflowNodeInfoDO>> addWorkflowNode(@RequestBody List<SaveWorkflowNodeRequest> request) {
return ResultDTO.success(workflowService.addWorkflowNode(request)); return ResultDTO.success(workflowService.saveWorkflowNode(request));
} }
@PostMapping("/saveDAG")
public ResultDTO<Void> saveWorkflowDAG(@RequestBody SaveWorkflowDAGRequest request) {
workflowService.saveWorkflowDAG(request);
return ResultDTO.success(null);
}
@PostMapping("/modifyNode")
public ResultDTO<Void> modifyWorkflowNode(@RequestBody ModifyWorkflowNodeRequest request) {
workflowService.modifyWorkflowNode(request);
return ResultDTO.success(null);
}
private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) { private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) {