feat: support nested workflow #266

This commit is contained in:
Echo009 2021-12-23 17:55:42 +08:00
parent c15cefc447
commit d996b34a54
10 changed files with 201 additions and 53 deletions

View File

@ -0,0 +1,5 @@
-- ----------------------------
-- Table change for workflow_instance_info
-- ----------------------------
alter table sx_workflow_instance_info
add parent_wf_instance_id bigint default null null comment '上层工作流实例ID';

View File

@ -3,7 +3,6 @@ package tech.powerjob.common.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@ -59,6 +58,11 @@ public class PEWorkflowDAG implements Serializable {
* job id
*/
private Long jobId;
/**
* workflow id,support for nested workflow
* @see WorkflowNodeType#NESTED_WORKFLOW
*/
private Long wfId;
/**
* node name
*/

View File

@ -44,7 +44,6 @@ public class DispatchService {
private TransportService transportService;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
@Resource
private InstanceManager instanceManager;
@Resource

View File

@ -0,0 +1,35 @@
package tech.powerjob.server.core.helper;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
/**
* @author Echo009
* @since 2021/12/13
*/
public class StatusMappingHelper {
private StatusMappingHelper(){
}
/**
* 工作流实例状态转任务实例状态
*/
public static InstanceStatus toInstanceStatus(WorkflowInstanceStatus workflowInstanceStatus) {
switch (workflowInstanceStatus) {
case FAILED:
return InstanceStatus.FAILED;
case SUCCEED:
return InstanceStatus.SUCCEED;
case RUNNING:
return InstanceStatus.RUNNING;
case STOPPED:
return InstanceStatus.STOPPED;
default:
return null;
}
}
}

View File

@ -197,7 +197,7 @@ public class PowerScheduleService {
wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录防止不调度的情况发生
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime());
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime(), null);
// 2. 推入时间轮准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();

View File

@ -8,18 +8,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.WorkflowContextConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseSegmentLock;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
@ -54,10 +52,6 @@ public class WorkflowInstanceManager {
@Resource
private AlarmCenter alarmCenter;
@Resource
private InstanceService instanceService;
@Resource
private DispatchService dispatchService;
@Resource
private IdGenerateService idGenerateService;
@Resource
private JobInfoRepository jobInfoRepository;
@ -84,12 +78,18 @@ public class WorkflowInstanceManager {
* @param expectTriggerTime 预计执行时间
* @return wfInstanceId
*/
public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) {
public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long parentWfInstanceId) {
Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate();
// 构造实例信息
WorkflowInstanceInfoDO newWfInstance = constructWfInstance(wfInfo, initParams, expectTriggerTime, wfId, wfInstanceId);
if (parentWfInstanceId != null) {
// 处理子工作流
newWfInstance.setParentWfInstanceId(parentWfInstanceId);
// 直接透传上下文
newWfInstance.setWfContext(initParams);
}
PEWorkflowDAG dag = null;
try {
@ -121,7 +121,7 @@ public class WorkflowInstanceManager {
if (dag != null) {
newWfInstance.setDag(JSON.toJSONString(dag));
}
onWorkflowInstanceFailed(e.getMessage(), newWfInstance);
handleWfInstanceFinalStatus(newWfInstance, e.getMessage(), WorkflowInstanceStatus.FAILED);
}
return wfInstanceId;
}
@ -216,7 +216,7 @@ public class WorkflowInstanceManager {
// 并发度控制
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
handleWfInstanceFinalStatus(wfInstanceInfo, String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), WorkflowInstanceStatus.FAILED);
return;
}
}
@ -248,8 +248,8 @@ public class WorkflowInstanceManager {
workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstanceInfo);
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
} catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo);
log.error("[Workflow-{}|{}] start workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
handleWfInstanceFinalStatus(wfInstanceInfo, e.getMessage(), WorkflowInstanceStatus.FAILED);
}
}
@ -321,13 +321,13 @@ public class WorkflowInstanceManager {
// 任务失败 && 不允许失败跳过DAG 流程被打断整体失败
if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) {
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance);
handleWfInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_FAILED, WorkflowInstanceStatus.FAILED);
return;
}
// 子任务被手动停止
if (status == InstanceStatus.STOPPED) {
updateWorkflowInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED);
handleWfInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED);
log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId);
return;
}
@ -342,7 +342,7 @@ public class WorkflowInstanceManager {
// 这里得重新更新一下因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态
wfInstance.setDag(JSON.toJSONString(dag));
// 最终任务的结果作为整个 workflow 的结果
updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
return;
}
@ -357,7 +357,7 @@ public class WorkflowInstanceManager {
if (readyNodes.isEmpty()) {
if (isFinish(dag)) {
wfInstance.setDag(JSON.toJSONString(dag));
updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED);
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
return;
}
@ -369,11 +369,12 @@ public class WorkflowInstanceManager {
// 处理任务节点
workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstance);
} catch (Exception e) {
onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance);
handleWfInstanceFinalStatus(wfInstance, "MOVE NEXT STEP FAILED: " + e.getMessage(), WorkflowInstanceStatus.FAILED);
log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e);
}
}
/**
* 更新工作流上下文
* fix : 得和其他操作工作流实例的方法用同一把锁才行不然有并发问题会导致节点状态被覆盖
@ -408,14 +409,46 @@ public class WorkflowInstanceManager {
}
private void updateWorkflowInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) {
private void handleWfInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) {
wfInstance.setStatus(workflowInstanceStatus.getV());
wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
// 处理子工作流
if (wfInstance.getParentWfInstanceId() != null) {
// 先处理上下文
if (workflowInstanceStatus == WorkflowInstanceStatus.SUCCEED){
HashMap<String, String> wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference<HashMap<String, String>>() {
});
updateWorkflowContext(wfInstance.getParentWfInstanceId(),wfContext);
}
// 处理父工作流
move(wfInstance.getWfInstanceId(),wfInstance.getParentWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus),result);
}
// 报警
if (workflowInstanceStatus == WorkflowInstanceStatus.FAILED) {
try {
workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> {
WorkflowInstanceAlarm content = new WorkflowInstanceAlarm();
BeanUtils.copyProperties(wfInfo, content);
BeanUtils.copyProperties(wfInstance, content);
content.setResult(result);
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
alarmCenter.alarmFailed(content, userList);
});
} catch (Exception ignore) {
// ignore
}
}
}
private List<PEWorkflowDAG.Node> findControlNodes(List<PEWorkflowDAG.Node> readyNodes) {
return readyNodes.stream().filter(node -> {
WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType());
@ -432,30 +465,4 @@ public class WorkflowInstanceManager {
return true;
}
private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) {
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
// 报警
try {
workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> {
WorkflowInstanceAlarm content = new WorkflowInstanceAlarm();
BeanUtils.copyProperties(wfInfo, content);
BeanUtils.copyProperties(wfInstance, content);
content.setResult(result);
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
alarmCenter.alarmFailed(content, userList);
});
} catch (Exception ignore) {
// ignore
}
}
}

View File

@ -2,6 +2,7 @@ package tech.powerjob.server.core.workflow;
import com.alibaba.fastjson.JSON;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
@ -61,6 +62,10 @@ public class WorkflowInstanceService {
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
throw new PowerJobException("workflow instance already stopped");
}
// 先终止父工作流
if (wfInstance.getParentWfInstanceId() != null) {
stopWorkflowInstance(wfInstance.getParentWfInstanceId(),appId);
}
// 停止所有已启动且未完成的服务
PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
// 遍历所有节点终止正在运行的
@ -70,8 +75,14 @@ public class WorkflowInstanceService {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER);
// 注意这里并不保证一定能终止正在运行的实例
instanceService.stopInstance(appId,node.getInstanceId());
// 特殊处理嵌套工作流节点
if (Objects.equals(node.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) {
stopWorkflowInstance(node.getInstanceId(), appId);
//
} else {
// 注意这里并不保证一定能终止正在运行的实例
instanceService.stopInstance(appId, node.getInstanceId());
}
}
} catch (Exception e) {
log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e);
@ -163,7 +174,7 @@ public class WorkflowInstanceService {
* 即处于 [失败且不允许跳过] 的节点
* 而且仅会操作工作流实例 DAG 中的节点信息状态result
* 并不会改变对应任务实例中的任何信息
*
* <p>
* 还是加把锁保平安 ~
*
* @param wfInstanceId 工作流实例 ID

View File

@ -55,7 +55,7 @@ public class WorkflowService {
/**
* 保存/修改工作流信息
*
* <p>
* 注意这里不会保存 DAG 信息
*
* @param req 请求
@ -271,7 +271,7 @@ public class WorkflowService {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay, null);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId);
} else {

View File

@ -0,0 +1,83 @@
package tech.powerjob.server.core.workflow.hanlder.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import javax.annotation.Resource;
import java.util.Date;
/**
* @author Echo009
* @since 2021/12/13
*/
@Component
@Slf4j
public class NestedWorkflowNodeHandler implements TaskNodeHandler {
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// check
Long wfId = node.getWfId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
if (targetWf == null || targetWf.getStatus() == SwitchableStatus.DELETED.getV()) {
if (targetWf == null) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId());
} else {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId());
}
throw new PowerJobException("invalid nested workflow node," + node.getNodeId());
}
if (node.getInstanceId() != null) {
// 处理重试的情形不需要创建实例仅需要更改对应实例的状态
WorkflowInstanceInfoDO wfInstance = workflowInstanceInfoRepository.findByWfInstanceId(node.getInstanceId()).orElse(null);
if (wfInstance == null) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId());
throw new PowerJobException("invalid workflow instance id" + node.getNodeId());
}
wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
} else {
// 透传当前的上下文创建新的工作流实例
String wfContext = wfInstanceInfo.getWfContext();
Long instanceId = workflowInstanceManager.create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId());
node.setInstanceId(instanceId);
}
node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis()));
node.setStatus(InstanceStatus.RUNNING.getV());
}
@Override
public void startTaskInstance(PEWorkflowDAG.Node node) {
Long wfId = node.getWfId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
workflowInstanceManager.start(targetWf, node.getInstanceId());
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.NESTED_WORKFLOW;
}
}

View File

@ -33,6 +33,10 @@ public class WorkflowInstanceInfoDO {
* workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
*/
private Long wfInstanceId;
/**
* 上层工作流实例 ID 用于支持工作流嵌套
*/
private Long parentWfInstanceId;
private Long workflowId;
/**