mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
refactor: simplify processing logic of workflow maintenance
This commit is contained in:
parent
0febeea298
commit
ab8c8b5f0f
@ -70,20 +70,17 @@ class TestWorkflow extends ClientInitializer {
|
||||
// 创建节点
|
||||
SaveWorkflowNodeRequest saveWorkflowNodeRequest1 = new SaveWorkflowNodeRequest();
|
||||
saveWorkflowNodeRequest1.setJobId(1L);
|
||||
saveWorkflowNodeRequest1.setWorkflowId(req.getId());
|
||||
saveWorkflowNodeRequest1.setNodeName("DAG-Node-1");
|
||||
saveWorkflowNodeRequest1.setType(WorkflowNodeType.JOB);
|
||||
|
||||
SaveWorkflowNodeRequest saveWorkflowNodeRequest2 = new SaveWorkflowNodeRequest();
|
||||
saveWorkflowNodeRequest2.setJobId(1L);
|
||||
saveWorkflowNodeRequest2.setWorkflowId(req.getId());
|
||||
saveWorkflowNodeRequest2.setNodeName("DAG-Node-2");
|
||||
saveWorkflowNodeRequest2.setType(WorkflowNodeType.JOB);
|
||||
|
||||
|
||||
SaveWorkflowNodeRequest saveWorkflowNodeRequest3 = new SaveWorkflowNodeRequest();
|
||||
saveWorkflowNodeRequest3.setJobId(1L);
|
||||
saveWorkflowNodeRequest3.setWorkflowId(req.getId());
|
||||
saveWorkflowNodeRequest3.setNodeName("DAG-Node-3");
|
||||
saveWorkflowNodeRequest3.setType(WorkflowNodeType.JOB);
|
||||
|
||||
@ -97,9 +94,9 @@ class TestWorkflow extends ClientInitializer {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId(), 1L, "DAG-Node-1"));
|
||||
nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId(), 1L, "DAG-Node-2"));
|
||||
nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId(), 1L, "DAG-Node-3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId()));
|
||||
nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId()));
|
||||
nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId()));
|
||||
|
||||
edges.add(new PEWorkflowDAG.Edge(nodeList.get(0).getId(), nodeList.get(1).getId()));
|
||||
edges.add(new PEWorkflowDAG.Edge(nodeList.get(1).getId(), nodeList.get(2).getId()));
|
||||
|
@ -45,6 +45,8 @@ public class SystemInstanceResult {
|
||||
public static final String MIDDLE_JOB_FAILED = "middle job failed";
|
||||
public static final String MIDDLE_JOB_STOPPED = "middle job stopped by user";
|
||||
public static final String CAN_NOT_FIND_JOB = "can't find some job";
|
||||
public static final String CAN_NOT_FIND_NODE = "can't find some node";
|
||||
public static final String ILLEGAL_NODE = "illegal node info";
|
||||
|
||||
/**
|
||||
* 没有启用的节点
|
||||
|
@ -45,15 +45,22 @@ public class PEWorkflowDAG implements Serializable {
|
||||
* @since 20210128
|
||||
*/
|
||||
private Long nodeId;
|
||||
/* Instance running param, which is not required by DAG. */
|
||||
|
||||
/**
|
||||
* note type
|
||||
* @since 20210316
|
||||
*/
|
||||
private Integer nodeType;
|
||||
/**
|
||||
* job id
|
||||
*/
|
||||
private Long jobId;
|
||||
/**
|
||||
* node name
|
||||
*/
|
||||
private String nodeName;
|
||||
|
||||
/* Instance running param, which is not required by DAG. */
|
||||
|
||||
@JsonSerialize(using= ToStringSerializer.class)
|
||||
private Long instanceId;
|
||||
|
||||
@ -69,11 +76,8 @@ public class PEWorkflowDAG implements Serializable {
|
||||
|
||||
private Boolean skipWhenFailed;
|
||||
|
||||
|
||||
public Node(Long nodeId,Long jobId, String nodeName) {
|
||||
public Node(Long nodeId) {
|
||||
this.nodeId = nodeId;
|
||||
this.jobId = jobId;
|
||||
this.nodeName = nodeName;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,20 +7,18 @@ import lombok.Data;
|
||||
|
||||
|
||||
/**
|
||||
* 新增工作流节点信息请求
|
||||
* 保存工作流节点信息请求
|
||||
* 工作流节点的
|
||||
*
|
||||
* @author zenggonggu
|
||||
* @since 2021/02/02
|
||||
*/
|
||||
@Data
|
||||
|
||||
public class SaveWorkflowNodeRequest {
|
||||
|
||||
private Long id;
|
||||
|
||||
private Long appId;
|
||||
|
||||
private Long workflowId;
|
||||
/**
|
||||
* 节点类型(默认为任务节点)
|
||||
*/
|
||||
|
@ -14,6 +14,7 @@ import tech.powerjob.common.WorkflowContextConstant;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.common.enums.WorkflowNodeType;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.common.utils.JsonUtils;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
@ -83,8 +84,79 @@ public class WorkflowInstanceManager {
|
||||
|
||||
Long wfId = wfInfo.getId();
|
||||
Long wfInstanceId = idGenerateService.allocate();
|
||||
// 构造实例信息
|
||||
WorkflowInstanceInfoDO newWfInstance = constructWfInstance(wfInfo, initParams, expectTriggerTime, wfId, wfInstanceId);
|
||||
|
||||
PEWorkflowDAG dag;
|
||||
try {
|
||||
dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
// 校验 DAG 信息
|
||||
if (!WorkflowDAGUtils.valid(dag)) {
|
||||
log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId);
|
||||
throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
|
||||
}
|
||||
// 初始化节点信息
|
||||
initNodeInfo(dag);
|
||||
newWfInstance.setDag(JSON.toJSONString(dag));
|
||||
// 最后检查工作流中的任务是否均处于可用状态(没有被删除)
|
||||
Set<Long> allJobIds = Sets.newHashSet();
|
||||
dag.getNodes().forEach(node -> {
|
||||
allJobIds.add(node.getJobId());
|
||||
// 将节点的初始状态置为等待派发
|
||||
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
|
||||
});
|
||||
int needNum = allJobIds.size();
|
||||
long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds);
|
||||
log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum);
|
||||
if (dbNum < allJobIds.size()) {
|
||||
log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum);
|
||||
throw new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB);
|
||||
}
|
||||
workflowInstanceInfoRepository.saveAndFlush(newWfInstance);
|
||||
} catch (Exception e) {
|
||||
onWorkflowInstanceFailed(e.getMessage(), newWfInstance);
|
||||
}
|
||||
return wfInstanceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化节点信息
|
||||
*/
|
||||
private void initNodeInfo(PEWorkflowDAG dag) {
|
||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||
WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE));
|
||||
// 任务节点,需初始化 是否启用、是否允许失败跳过、节点参数 等信息
|
||||
if (workflowNodeInfo.getType() == null || workflowNodeInfo.getType() == WorkflowNodeType.JOB.getCode()) {
|
||||
// 任务节点缺失任务信息
|
||||
if (workflowNodeInfo.getJobId() == null) {
|
||||
throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE);
|
||||
}
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(()->new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB));
|
||||
|
||||
node.setNodeType(WorkflowNodeType.JOB.getCode());
|
||||
// 初始化任务相关信息
|
||||
node.setJobId(workflowNodeInfo.getJobId())
|
||||
.setNodeName(workflowNodeInfo.getNodeName())
|
||||
.setEnable(workflowNodeInfo.getEnable())
|
||||
.setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed());
|
||||
|
||||
if (!StringUtils.isBlank(workflowNodeInfo.getNodeParams())) {
|
||||
node.setNodeParams(workflowNodeInfo.getNodeParams());
|
||||
} else {
|
||||
node.setNodeParams(jobInfo.getJobParams());
|
||||
}
|
||||
} else {
|
||||
// 非任务节点
|
||||
node.setNodeType(workflowNodeInfo.getType());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造工作流实例,并初始化基础信息(不包括 DAG )
|
||||
*/
|
||||
private WorkflowInstanceInfoDO constructWfInstance(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long wfId, Long wfInstanceId) {
|
||||
|
||||
// 创建 并初始化 DAG 信息
|
||||
Date now = new Date();
|
||||
WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO();
|
||||
newWfInstance.setAppId(wfInfo.getAppId());
|
||||
@ -101,43 +173,7 @@ public class WorkflowInstanceManager {
|
||||
|
||||
newWfInstance.setGmtCreate(now);
|
||||
newWfInstance.setGmtModified(now);
|
||||
|
||||
// 校验 DAG 信息
|
||||
PEWorkflowDAG dag;
|
||||
try {
|
||||
dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
// 校验
|
||||
if (!WorkflowDAGUtils.valid(dag)) {
|
||||
throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance);
|
||||
return newWfInstance.getWfInstanceId();
|
||||
}
|
||||
// 校验合法性(工作是否存在且启用)
|
||||
Set<Long> allJobIds = Sets.newHashSet();
|
||||
dag.getNodes().forEach(node -> {
|
||||
allJobIds.add(node.getJobId());
|
||||
// 将节点的初始状态置为等待派发
|
||||
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
|
||||
});
|
||||
int needNum = allJobIds.size();
|
||||
// 检查工作流中的任务是否均处于可用状态(没有被删除)
|
||||
long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds);
|
||||
log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum);
|
||||
// 先 set 一次,异常的话直接存这个信息
|
||||
newWfInstance.setDag(JSON.toJSONString(dag));
|
||||
if (dbNum < allJobIds.size()) {
|
||||
log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance);
|
||||
} else {
|
||||
initNodeInfo(dag);
|
||||
// 再 set 一次,此时工作流中的节点信息已经完全初始化
|
||||
newWfInstance.setDag(JSON.toJSONString(dag));
|
||||
workflowInstanceInfoRepository.saveAndFlush(newWfInstance);
|
||||
}
|
||||
return wfInstanceId;
|
||||
return newWfInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -373,41 +409,6 @@ public class WorkflowInstanceManager {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化节点信息
|
||||
*
|
||||
* @param dag pe dag
|
||||
* @since 20210205
|
||||
*/
|
||||
private void initNodeInfo(PEWorkflowDAG dag) {
|
||||
// 初始化节点信息(是否启用、是否允许失败跳过、节点参数)
|
||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||
Optional<WorkflowNodeInfoDO> nodeInfoOpt = workflowNodeInfoRepository.findById(node.getNodeId());
|
||||
// 不考虑极端情况
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
|
||||
|
||||
if (!nodeInfoOpt.isPresent()) {
|
||||
// 默认启用 + 不允许失败跳过
|
||||
node.setEnable(true)
|
||||
.setSkipWhenFailed(false)
|
||||
.setNodeParams(jobInfo.getJobParams());
|
||||
} else {
|
||||
WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get();
|
||||
// 使用节点别名覆盖
|
||||
node.setNodeName(nodeInfo.getNodeName())
|
||||
.setEnable(nodeInfo.getEnable())
|
||||
.setSkipWhenFailed(nodeInfo.getSkipWhenFailed());
|
||||
// 如果节点中指定了参数信息,则取节点的,否则取 Job 上的
|
||||
if (!StringUtils.isBlank(nodeInfo.getNodeParams())) {
|
||||
node.setNodeParams(nodeInfo.getNodeParams());
|
||||
} else {
|
||||
node.setNodeParams(jobInfo.getJobParams());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 运行任务实例
|
||||
* 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题
|
||||
|
@ -1,11 +1,6 @@
|
||||
package tech.powerjob.server.core.workflow;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import tech.powerjob.common.PowerJobException;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -13,6 +8,11 @@ import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import tech.powerjob.common.PowerJobException;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
import tech.powerjob.server.common.SJ;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
|
||||
@ -30,7 +30,6 @@ import javax.annotation.Resource;
|
||||
import javax.transaction.Transactional;
|
||||
import java.text.ParseException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Workflow 服务
|
||||
@ -98,8 +97,7 @@ public class WorkflowService {
|
||||
wf = workflowInfoRepository.saveAndFlush(wf);
|
||||
wfId = wf.getId();
|
||||
}
|
||||
|
||||
wf.setPeDAG(validateAndConvert2String(wfId,req.getDag()));
|
||||
wf.setPeDAG(validateAndConvert2String(wfId, req.getDag()));
|
||||
workflowInfoRepository.saveAndFlush(wf);
|
||||
return wfId;
|
||||
}
|
||||
@ -108,7 +106,7 @@ public class WorkflowService {
|
||||
* 保存 DAG 信息
|
||||
* 这里会物理删除游离的节点信息
|
||||
*/
|
||||
private String validateAndConvert2String(Long wfId,PEWorkflowDAG dag) {
|
||||
private String validateAndConvert2String(Long wfId, PEWorkflowDAG dag) {
|
||||
if (dag == null || CollectionUtils.isEmpty(dag.getNodes())) {
|
||||
return "{}";
|
||||
}
|
||||
@ -118,6 +116,7 @@ public class WorkflowService {
|
||||
// 注意:这里只会保存图相关的基础信息,nodeId,jobId,jobName(nodeAlias)
|
||||
// 其中 jobId,jobName 均以节点中的信息为准
|
||||
List<Long> nodeIdList = Lists.newArrayList();
|
||||
List<PEWorkflowDAG.Node> newNodes = Lists.newArrayList();
|
||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||
WorkflowNodeInfoDO nodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find node info by id :" + node.getNodeId()));
|
||||
// 更新工作流 ID
|
||||
@ -129,15 +128,13 @@ public class WorkflowService {
|
||||
if (!wfId.equals(nodeInfo.getWorkflowId())) {
|
||||
throw new PowerJobException("can't use another workflow's node");
|
||||
}
|
||||
// 节点中的名称信息一定是非空的
|
||||
node.setNodeName(nodeInfo.getNodeName()).setJobId(nodeInfo.getJobId());
|
||||
// 清空其他信息
|
||||
node.setEnable(null).setSkipWhenFailed(null).setInstanceId(null).setResult(null);
|
||||
// 只保存节点的 ID 信息,清空其他信息
|
||||
newNodes.add(new PEWorkflowDAG.Node(node.getNodeId()));
|
||||
nodeIdList.add(node.getNodeId());
|
||||
}
|
||||
|
||||
dag.setNodes(newNodes);
|
||||
int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(wfId, nodeIdList);
|
||||
log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow",wfId, deleteCount);
|
||||
log.warn("[WorkflowService-{}]delete {} dissociative nodes of workflow", wfId, deleteCount);
|
||||
return JSON.toJSONString(dag);
|
||||
}
|
||||
|
||||
@ -295,15 +292,13 @@ public class WorkflowService {
|
||||
if (CollectionUtils.isEmpty(workflowNodeRequestList)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
validate(workflowNodeRequestList);
|
||||
final Long appId = workflowNodeRequestList.get(0).getAppId();
|
||||
List<WorkflowNodeInfoDO> res = new ArrayList<>(workflowNodeRequestList.size());
|
||||
// 记录变更过任务的节点
|
||||
List<WorkflowNodeInfoDO> changeJobNodeList = new ArrayList<>(workflowNodeRequestList.size());
|
||||
//
|
||||
Long wfId = null;
|
||||
for (SaveWorkflowNodeRequest req : workflowNodeRequestList) {
|
||||
if (req.getWorkflowId() != null) {
|
||||
wfId = req.getWorkflowId();
|
||||
req.valid();
|
||||
// 必须位于同一个 APP 下
|
||||
if (!appId.equals(req.getAppId())) {
|
||||
throw new PowerJobException("node list must are in the same app");
|
||||
}
|
||||
WorkflowNodeInfoDO workflowNodeInfo;
|
||||
if (req.getId() != null) {
|
||||
@ -313,11 +308,11 @@ public class WorkflowService {
|
||||
workflowNodeInfo.setGmtCreate(new Date());
|
||||
}
|
||||
JobInfoDO jobInfoDO = jobInfoRepository.findById(req.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + req.getJobId()));
|
||||
// 变更任务的节点
|
||||
if (workflowNodeInfo.getJobId() != null && !workflowNodeInfo.getJobId().equals(req.getJobId())) {
|
||||
changeJobNodeList.add(workflowNodeInfo);
|
||||
if (!jobInfoDO.getAppId().equals(appId)) {
|
||||
throw new PowerJobException("Permission Denied! can't use other app's job!");
|
||||
}
|
||||
BeanUtils.copyProperties(req, workflowNodeInfo);
|
||||
workflowNodeInfo.setType(req.getType().getCode());
|
||||
// 如果名称为空则默认取任务名称
|
||||
if (StringUtils.isEmpty(workflowNodeInfo.getNodeName())) {
|
||||
workflowNodeInfo.setNodeName(jobInfoDO.getJobName());
|
||||
@ -326,73 +321,9 @@ public class WorkflowService {
|
||||
workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo);
|
||||
res.add(workflowNodeInfo);
|
||||
}
|
||||
// 同步变更 DAG 中的任务信息
|
||||
if (wfId != null && !changeJobNodeList.isEmpty()) {
|
||||
updateDagJobInfo(changeJobNodeList, wfId);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
private void updateDagJobInfo(List<WorkflowNodeInfoDO> changeJobNodeList, Long wfId) {
|
||||
WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(wfId).orElseGet(WorkflowInfoDO::new);
|
||||
PEWorkflowDAG dag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
if (!CollectionUtils.isEmpty(dag.getNodes())) {
|
||||
Map<Long, PEWorkflowDAG.Node> nodeId2NodeMap = dag.getNodes().stream().collect(Collectors.toMap(PEWorkflowDAG.Node::getNodeId, e -> e));
|
||||
for (WorkflowNodeInfoDO nodeInfo : changeJobNodeList) {
|
||||
PEWorkflowDAG.Node node = nodeId2NodeMap.get(nodeInfo.getId());
|
||||
if (node != null) {
|
||||
node.setJobId(nodeInfo.getJobId());
|
||||
}
|
||||
}
|
||||
}
|
||||
workflowInfo.setPeDAG(JSON.toJSONString(dag));
|
||||
workflowInfo.setGmtModified(new Date());
|
||||
workflowInfoRepository.saveAndFlush(workflowInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验合法性
|
||||
* - 必须在同一个 APP 下
|
||||
* - 工作流 ID 只能都非空,或者都为 null
|
||||
* - 对应的 job 信息是否存在
|
||||
*
|
||||
* @param workflowNodeRequestList 非空工作流节点列表
|
||||
*/
|
||||
@SuppressWarnings("squid:S3776")
|
||||
private void validate(List<SaveWorkflowNodeRequest> workflowNodeRequestList) {
|
||||
Long appId = workflowNodeRequestList.get(0).getAppId();
|
||||
Long wfId = null;
|
||||
for (SaveWorkflowNodeRequest saveWorkflowNodeRequest : workflowNodeRequestList) {
|
||||
saveWorkflowNodeRequest.valid();
|
||||
// 必须位于同一个 APP 下
|
||||
if (!appId.equals(saveWorkflowNodeRequest.getAppId())) {
|
||||
throw new PowerJobException("node list must are in the same app");
|
||||
}
|
||||
if (saveWorkflowNodeRequest.getWorkflowId() != null) {
|
||||
if (wfId == null) {
|
||||
wfId = saveWorkflowNodeRequest.getWorkflowId();
|
||||
} else {
|
||||
// 工作流的 ID 必须一致
|
||||
if (!wfId.equals(saveWorkflowNodeRequest.getWorkflowId())) {
|
||||
throw new PowerJobException("node list must are in the same workflow");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 存在部分节点有工作流 ID ,部分没有
|
||||
if (wfId != null) {
|
||||
throw new PowerJobException("can't create and update node info in the same request");
|
||||
}
|
||||
}
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(saveWorkflowNodeRequest.getJobId()).orElseThrow(() -> new IllegalArgumentException("can't find job by id: " + saveWorkflowNodeRequest.getJobId()));
|
||||
if (!jobInfo.getAppId().equals(appId)) {
|
||||
throw new PowerJobException("Permission Denied! can't use other app's job!");
|
||||
}
|
||||
}
|
||||
if (wfId != null) {
|
||||
permissionCheck(wfId, appId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void fillWorkflow(WorkflowInfoDO wfInfo) {
|
||||
|
||||
|
@ -30,8 +30,8 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
// 测试图1: 1 -> 2 -> 1,理论上报错
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 1L));
|
||||
Assert.assertFalse(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges)));
|
||||
@ -43,10 +43,10 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
@ -78,10 +78,10 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
|
||||
@ -102,10 +102,10 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
// 测试图4:(双顶点 单个环) 1 -> 3 -> 1, 2 -> 4
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 1L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
@ -130,12 +130,12 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
@ -167,15 +167,15 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes1 = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges1 = Lists.newLinkedList();
|
||||
|
||||
nodes1.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(6L, 6L, "6").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(7L, 7L, "7").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(8L, 8L, "8").setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(9L, 9L, "9"));
|
||||
nodes1.add(new PEWorkflowDAG.Node(1L));
|
||||
nodes1.add(new PEWorkflowDAG.Node(2L).setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(3L));
|
||||
nodes1.add(new PEWorkflowDAG.Node(4L));
|
||||
nodes1.add(new PEWorkflowDAG.Node(5L));
|
||||
nodes1.add(new PEWorkflowDAG.Node(6L).setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(7L).setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(8L).setEnable(false));
|
||||
nodes1.add(new PEWorkflowDAG.Node(9L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(1L, 3L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
edges1.add(new PEWorkflowDAG.Edge(4L, 5L));
|
||||
@ -209,12 +209,12 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 3L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 4L));
|
||||
@ -248,13 +248,13 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setStatus(InstanceStatus.SUCCEED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L).setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L).setStatus(InstanceStatus.SUCCEED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L));
|
||||
nodes.add(new PEWorkflowDAG.Node(7L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 4L));
|
||||
edges.add(new PEWorkflowDAG.Edge(3L, 4L));
|
||||
@ -291,12 +291,12 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L).setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
@ -334,13 +334,13 @@ public class DAGTest {
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1").setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L, 3L, "3").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L, 4L, "4").setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L, 5L, "5"));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L, 6L, "6"));
|
||||
nodes.add(new PEWorkflowDAG.Node(7L, 7L, "7"));
|
||||
nodes.add(new PEWorkflowDAG.Node(1L).setStatus(InstanceStatus.FAILED.getV()));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(3L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(4L).setEnable(false));
|
||||
nodes.add(new PEWorkflowDAG.Node(5L));
|
||||
nodes.add(new PEWorkflowDAG.Node(6L));
|
||||
nodes.add(new PEWorkflowDAG.Node(7L));
|
||||
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
|
||||
edges.add(new PEWorkflowDAG.Edge(2L, 5L));
|
||||
edges.add(new PEWorkflowDAG.Edge(5L, 6L));
|
||||
|
Loading…
x
Reference in New Issue
Block a user