diff --git a/others/sql/v4.0.1-v4.1.0.sql b/others/sql/v4.0.1-v4.1.0.sql new file mode 100644 index 00000000..a9bb5c64 --- /dev/null +++ b/others/sql/v4.0.1-v4.1.0.sql @@ -0,0 +1,5 @@ +-- ---------------------------- +-- Table change for workflow_instance_info +-- ---------------------------- +alter table sx_workflow_instance_info + add parent_wf_instance_id bigint default null null comment '上层工作流实例ID'; diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java index 64b53d5c..a8a5ab88 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/PEWorkflowDAG.java @@ -3,7 +3,6 @@ package tech.powerjob.common.model; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import com.google.common.collect.Lists; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; @@ -59,6 +58,11 @@ public class PEWorkflowDAG implements Serializable { * job id */ private Long jobId; + /** + * workflow id,support for nested workflow + * @see WorkflowNodeType#NESTED_WORKFLOW + */ + private Long wfId; /** * node name */ diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index 248f00e4..fae8fe59 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -44,7 +44,6 @@ public class DispatchService { private TransportService transportService; @Resource private WorkerClusterQueryService workerClusterQueryService; - @Resource private InstanceManager instanceManager; @Resource diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/helper/StatusMappingHelper.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/helper/StatusMappingHelper.java new file mode 100644 index 00000000..10789625 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/helper/StatusMappingHelper.java @@ -0,0 +1,35 @@ +package tech.powerjob.server.core.helper; + +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowInstanceStatus; + +/** + * @author Echo009 + * @since 2021/12/13 + */ +public class StatusMappingHelper { + + private StatusMappingHelper(){ + + } + + /** + * 工作流实例状态转任务实例状态 + */ + public static InstanceStatus toInstanceStatus(WorkflowInstanceStatus workflowInstanceStatus) { + switch (workflowInstanceStatus) { + case FAILED: + return InstanceStatus.FAILED; + case SUCCEED: + return InstanceStatus.SUCCEED; + case RUNNING: + return InstanceStatus.RUNNING; + case STOPPED: + return InstanceStatus.STOPPED; + default: + return null; + } + } + + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index b311ce4e..11382830 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -197,7 +197,7 @@ public class PowerScheduleService { wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 - Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime()); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime(), null); // 2. 推入时间轮,准备调度执行 long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index e9fa802a..73e9373c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -8,18 +8,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.WorkflowContextConstant; import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.core.DispatchService; -import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.core.helper.StatusMappingHelper; import tech.powerjob.server.core.lock.UseSegmentLock; import tech.powerjob.server.core.service.UserService; import tech.powerjob.server.core.service.WorkflowNodeHandleService; @@ -54,10 +52,6 @@ public class WorkflowInstanceManager { @Resource private AlarmCenter alarmCenter; @Resource - private InstanceService instanceService; - @Resource - private DispatchService dispatchService; - @Resource private IdGenerateService idGenerateService; @Resource private JobInfoRepository jobInfoRepository; @@ -84,12 +78,18 @@ public class WorkflowInstanceManager { * @param expectTriggerTime 预计执行时间 * @return wfInstanceId */ - public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) { + public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long parentWfInstanceId) { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); // 构造实例信息 WorkflowInstanceInfoDO newWfInstance = constructWfInstance(wfInfo, initParams, expectTriggerTime, wfId, wfInstanceId); + if (parentWfInstanceId != null) { + // 处理子工作流 + newWfInstance.setParentWfInstanceId(parentWfInstanceId); + // 直接透传上下文 + newWfInstance.setWfContext(initParams); + } PEWorkflowDAG dag = null; try { @@ -121,7 +121,7 @@ public class WorkflowInstanceManager { if (dag != null) { newWfInstance.setDag(JSON.toJSONString(dag)); } - onWorkflowInstanceFailed(e.getMessage(), newWfInstance); + handleWfInstanceFinalStatus(newWfInstance, e.getMessage(), WorkflowInstanceStatus.FAILED); } return wfInstanceId; } @@ -216,7 +216,7 @@ public class WorkflowInstanceManager { // 并发度控制 int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS); if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { - onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo); + handleWfInstanceFinalStatus(wfInstanceInfo, String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), WorkflowInstanceStatus.FAILED); return; } } @@ -248,8 +248,8 @@ public class WorkflowInstanceManager { workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstanceInfo); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); } catch (Exception e) { - log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); - onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo); + log.error("[Workflow-{}|{}] start workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); + handleWfInstanceFinalStatus(wfInstanceInfo, e.getMessage(), WorkflowInstanceStatus.FAILED); } } @@ -321,13 +321,13 @@ public class WorkflowInstanceManager { // 任务失败 && 不允许失败跳过,DAG 流程被打断,整体失败 if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) { log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); - onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); + handleWfInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_FAILED, WorkflowInstanceStatus.FAILED); return; } // 子任务被手动停止 if (status == InstanceStatus.STOPPED) { - updateWorkflowInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED); + handleWfInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED); log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId); return; } @@ -342,7 +342,7 @@ public class WorkflowInstanceManager { // 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态 wfInstance.setDag(JSON.toJSONString(dag)); // 最终任务的结果作为整个 workflow 的结果 - updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); + handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); return; } @@ -357,7 +357,7 @@ public class WorkflowInstanceManager { if (readyNodes.isEmpty()) { if (isFinish(dag)) { wfInstance.setDag(JSON.toJSONString(dag)); - updateWorkflowInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); + handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); return; } @@ -369,11 +369,12 @@ public class WorkflowInstanceManager { // 处理任务节点 workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstance); } catch (Exception e) { - onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); + handleWfInstanceFinalStatus(wfInstance, "MOVE NEXT STEP FAILED: " + e.getMessage(), WorkflowInstanceStatus.FAILED); log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); } } + /** * 更新工作流上下文 * fix : 得和其他操作工作流实例的方法用同一把锁才行,不然有并发问题,会导致节点状态被覆盖 @@ -408,14 +409,46 @@ public class WorkflowInstanceManager { } - private void updateWorkflowInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) { + private void handleWfInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) { wfInstance.setStatus(workflowInstanceStatus.getV()); wfInstance.setResult(result); wfInstance.setFinishedTime(System.currentTimeMillis()); + wfInstance.setGmtModified(new Date()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); + + // 处理子工作流 + if (wfInstance.getParentWfInstanceId() != null) { + // 先处理上下文 + if (workflowInstanceStatus == WorkflowInstanceStatus.SUCCEED){ + HashMap wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference>() { + }); + updateWorkflowContext(wfInstance.getParentWfInstanceId(),wfContext); + } + // 处理父工作流 + move(wfInstance.getWfInstanceId(),wfInstance.getParentWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus),result); + } + + // 报警 + if (workflowInstanceStatus == WorkflowInstanceStatus.FAILED) { + try { + workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> { + WorkflowInstanceAlarm content = new WorkflowInstanceAlarm(); + + BeanUtils.copyProperties(wfInfo, content); + BeanUtils.copyProperties(wfInstance, content); + content.setResult(result); + + List userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds()); + alarmCenter.alarmFailed(content, userList); + }); + } catch (Exception ignore) { + // ignore + } + } } + private List findControlNodes(List readyNodes) { return readyNodes.stream().filter(node -> { WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType()); @@ -432,30 +465,4 @@ public class WorkflowInstanceManager { return true; } - private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) { - - wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); - wfInstance.setResult(result); - wfInstance.setFinishedTime(System.currentTimeMillis()); - wfInstance.setGmtModified(new Date()); - - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - - // 报警 - try { - workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> { - WorkflowInstanceAlarm content = new WorkflowInstanceAlarm(); - - BeanUtils.copyProperties(wfInfo, content); - BeanUtils.copyProperties(wfInstance, content); - content.setResult(result); - - List userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds()); - alarmCenter.alarmFailed(content, userList); - }); - } catch (Exception ignore) { - // ignore - } - } - } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java index 4c58b1dd..273e0d02 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java @@ -2,6 +2,7 @@ package tech.powerjob.server.core.workflow; import com.alibaba.fastjson.JSON; import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.WorkflowInstanceStatus; @@ -61,6 +62,10 @@ public class WorkflowInstanceService { if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("workflow instance already stopped"); } + // 先终止父工作流 + if (wfInstance.getParentWfInstanceId() != null) { + stopWorkflowInstance(wfInstance.getParentWfInstanceId(),appId); + } // 停止所有已启动且未完成的服务 PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); // 遍历所有节点,终止正在运行的 @@ -70,8 +75,14 @@ public class WorkflowInstanceService { log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); node.setStatus(InstanceStatus.STOPPED.getV()); node.setResult(SystemInstanceResult.STOPPED_BY_USER); - // 注意,这里并不保证一定能终止正在运行的实例 - instanceService.stopInstance(appId,node.getInstanceId()); + // 特殊处理嵌套工作流节点 + if (Objects.equals(node.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) { + stopWorkflowInstance(node.getInstanceId(), appId); + // + } else { + // 注意,这里并不保证一定能终止正在运行的实例 + instanceService.stopInstance(appId, node.getInstanceId()); + } } } catch (Exception e) { log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e); @@ -163,7 +174,7 @@ public class WorkflowInstanceService { * 即处于 [失败且不允许跳过] 的节点 * 而且仅会操作工作流实例 DAG 中的节点信息(状态、result) * 并不会改变对应任务实例中的任何信息 - * + *

* 还是加把锁保平安 ~ * * @param wfInstanceId 工作流实例 ID diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index 9a4d6794..ff891c33 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -55,7 +55,7 @@ public class WorkflowService { /** * 保存/修改工作流信息 - * + *

* 注意这里不会保存 DAG 信息 * * @param req 请求 @@ -271,7 +271,7 @@ public class WorkflowService { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay); - Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay, null); if (delay <= 0) { workflowInstanceManager.start(wfInfo, wfInstanceId); } else { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java new file mode 100644 index 00000000..93afe9d9 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java @@ -0,0 +1,83 @@ +package tech.powerjob.server.core.workflow.hanlder.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.common.enums.WorkflowNodeType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; +import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; +import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; +import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; +import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; + +import javax.annotation.Resource; +import java.util.Date; + +/** + * @author Echo009 + * @since 2021/12/13 + */ +@Component +@Slf4j +public class NestedWorkflowNodeHandler implements TaskNodeHandler { + + @Resource + private WorkflowInfoRepository workflowInfoRepository; + + @Resource + private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + + @Resource + private WorkflowInstanceManager workflowInstanceManager; + + @Override + public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + // check + Long wfId = node.getWfId(); + WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); + if (targetWf == null || targetWf.getStatus() == SwitchableStatus.DELETED.getV()) { + if (targetWf == null) { + log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId()); + } else { + log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getWfId()); + } + throw new PowerJobException("invalid nested workflow node," + node.getNodeId()); + } + if (node.getInstanceId() != null) { + // 处理重试的情形,不需要创建实例,仅需要更改对应实例的状态 + WorkflowInstanceInfoDO wfInstance = workflowInstanceInfoRepository.findByWfInstanceId(node.getInstanceId()).orElse(null); + if (wfInstance == null) { + log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId()); + throw new PowerJobException("invalid workflow instance id" + node.getNodeId()); + } + wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); + wfInstance.setGmtModified(new Date()); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + } else { + // 透传当前的上下文创建新的工作流实例 + String wfContext = wfInstanceInfo.getWfContext(); + Long instanceId = workflowInstanceManager.create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId()); + node.setInstanceId(instanceId); + } + node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis())); + node.setStatus(InstanceStatus.RUNNING.getV()); + } + + @Override + public void startTaskInstance(PEWorkflowDAG.Node node) { + Long wfId = node.getWfId(); + WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); + workflowInstanceManager.start(targetWf, node.getInstanceId()); + } + + @Override + public WorkflowNodeType matchingType() { + return WorkflowNodeType.NESTED_WORKFLOW; + } +} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowInstanceInfoDO.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowInstanceInfoDO.java index 54714221..844b52c6 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/WorkflowInstanceInfoDO.java @@ -33,6 +33,10 @@ public class WorkflowInstanceInfoDO { * workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求) */ private Long wfInstanceId; + /** + * 上层工作流实例 ID (用于支持工作流嵌套) + */ + private Long parentWfInstanceId; private Long workflowId; /**