From 4ee6300c6a564db7a0ed0a12164f0cf6fb82edad Mon Sep 17 00:00:00 2001 From: Echo009 Date: Thu, 18 Feb 2021 15:15:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B7=A5=E4=BD=9C=E6=B5=81=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=94=AF=E6=8C=81=E5=A4=B1=E8=B4=A5=E8=B7=B3=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../powerjob/common/InstanceStatus.java | 16 ++- .../powerjob/common/SystemInstanceResult.java | 38 ++++- .../worker/handler/WorkerRequestHandler.java | 2 +- .../server/service/DispatchService.java | 12 +- .../powerjob/server/service/JobService.java | 2 +- .../service/instance/InstanceService.java | 4 +- .../server/service/timing/CleanService.java | 6 +- .../timing/schedule/OmsScheduleService.java | 2 +- .../workflow/WorkflowInstanceManager.java | 136 ++++++++++++------ .../workflow/WorkflowInstanceService.java | 8 +- .../powerjob/server/test/RepositoryTest.java | 2 +- .../tester/AppendWorkflowContextTester.java | 9 +- 12 files changed, 164 insertions(+), 73 deletions(-) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java index c1d3da7b..e8d7b705 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java @@ -15,7 +15,9 @@ import java.util.List; @Getter @AllArgsConstructor public enum InstanceStatus { - + /** + * + */ WAITING_DISPATCH(1, "等待派发"), WAITING_WORKER_RECEIVE(2, "等待Worker接收"), RUNNING(3, "运行中"), @@ -27,10 +29,14 @@ public enum InstanceStatus { private final int v; private final String des; - // 广义的运行状态 - public static final List generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); - // 结束状态 - public static final List finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v); + /** + * 广义的运行状态 + */ + public static final List GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); + /** + * 结束状态 + */ + public static final List FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v); public static InstanceStatus of(int v) { for (InstanceStatus is : values()) { diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java index 999bc54e..79346426 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java @@ -8,30 +8,54 @@ package com.github.kfcfans.powerjob.common; */ public class SystemInstanceResult { + private SystemInstanceResult() { + + } + /* *********** 普通instance 专用 *********** */ - // 同时运行的任务实例数过多 + /** + * 同时运行的任务实例数过多 + */ public static final String TOO_MANY_INSTANCES = "too many instances(%d>%d)"; - // 无可用worker + /** + * 无可用worker + */ public static final String NO_WORKER_AVAILABLE = "no worker available"; - // 任务执行超时 + /** + * 任务执行超时 + */ public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout"; - // 创建根任务失败 + /** + * 创建根任务失败 + */ public static final String TASK_INIT_FAILED = "create root task failed"; - // 未知错误 + /** + * 未知错误 + */ public static final String UNKNOWN_BUG = "unknown bug"; - // TaskTracker 长时间未上报 + /** + * TaskTracker 长时间未上报 + */ public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down"; public static final String CAN_NOT_FIND_JOB_INFO = "can't find job info"; /* *********** workflow 专用 *********** */ + public static final String MIDDLE_JOB_FAILED = "middle job failed"; public static final String MIDDLE_JOB_STOPPED = "middle job stopped by user"; public static final String CAN_NOT_FIND_JOB = "can't find some job"; - // 被用户手动停止 + /** + * 被用户手动停止 + */ public static final String STOPPED_BY_USER = "stopped by user"; public static final String CANCELED_BY_USER = "canceled by user"; + /** + * 无效 DAG + */ + public static final String INVALID_DAG = "invalid dag"; + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java index f2b47c78..d554d98a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java @@ -73,7 +73,7 @@ public class WorkerRequestHandler { instanceManager.updateStatus(req); // 结束状态(成功/失败)需要回复消息 - if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { + if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) { return Optional.of(AskResponse.succeed(null)); } return Optional.empty(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 0bef44d1..46188d80 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -51,7 +51,13 @@ public class DispatchService { private static final Splitter COMMA_SPLITTER = Splitter.on(","); - @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) + /** + * 重新派发任务 + * + * @param jobInfo 任务信息(注意,这里传入的任务信息有可能为“空”) + * @param instanceId 实例ID + */ + @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId() ?: 0", concurrencyLevel = 1024) public void redispatch(JobInfoDO jobInfo, long instanceId) { // 这里暂时保留 dispatch(jobInfo, instanceId); @@ -67,10 +73,10 @@ public class DispatchService { * 迁移至 {@link InstanceManager#updateStatus} 中处理 * ************************************************** * - * @param jobInfo 任务的元信息,注意这里传入的 jobInfo 可能为空对象 + * @param jobInfo 任务的元信息 * @param instanceId 任务实例ID */ - @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) + @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId() ?: 0", concurrencyLevel = 1024) public void dispatch(JobInfoDO jobInfo, long instanceId) { // 检查当前任务是否被取消 InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 153d1791..23722ab5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -185,7 +185,7 @@ public class JobService { if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) { return; } - List executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); + List executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS); if (CollectionUtils.isEmpty(executeLogs)) { return; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index 33d1e4da..32e86989 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -107,7 +107,7 @@ public class InstanceService { InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); // 判断状态,只有运行中才能停止 - if (!InstanceStatus.generalizedRunningStatus.contains(instanceInfo.getStatus())) { + if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(instanceInfo.getStatus())) { throw new IllegalArgumentException("can't stop finished instance!"); } @@ -153,7 +153,7 @@ public class InstanceService { log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId); InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); - if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) { + if (!InstanceStatus.FINISHED_STATUS.contains(instanceInfo.getStatus())) { throw new PowerJobException("Only stopped instance can be retry!"); } // 暂时不支持工作流任务的重试 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index c7654aeb..fd2741ab 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -51,7 +51,9 @@ public class CleanService { private static final int TEMPORARY_RETENTION_DAY = 3; - // 每天凌晨3点定时清理 + /** + * 每天凌晨3点定时清理 + */ private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; @@ -152,7 +154,7 @@ public class CleanService { } try { Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay); - int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.finishedStatus); + int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.FINISHED_STATUS); log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t); }catch (Exception e) { log.warn("[CleanService] clean instanceInfo failed.", e); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 69f66ff1..ddac25c3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -228,7 +228,7 @@ public class OmsScheduleService { return; } // 查询日志记录表中是否存在相关的任务 - List runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus); + List runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.GENERALIZED_RUNNING_STATUS); Set runningJobIdSet = Sets.newHashSet(runningJobIdList); List notRunningJobIds = Lists.newLinkedList(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index d67f53f4..fe658509 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -8,13 +8,11 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; -import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; -import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; -import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; -import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.*; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowNodeInfoRepository; import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.UserService; import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; @@ -24,6 +22,7 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock; import com.google.common.collect.*; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -55,6 +54,8 @@ public class WorkflowInstanceManager { private WorkflowInfoRepository workflowInfoRepository; @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + @Resource + private WorkflowNodeInfoRepository workflowNodeInfoRepository; private final SegmentLock segmentLock = new SegmentLock(16); @@ -75,7 +76,7 @@ public class WorkflowInstanceManager { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); - // 仅创建,不写入 DAG 图信息 + // 创建 并初始化 DAG 信息 Date now = new Date(); WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO(); newWfInstance.setAppId(wfInfo.getAppId()); @@ -100,11 +101,22 @@ public class WorkflowInstanceManager { int needNum = allJobIds.size(); long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds); log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); - + // 先 set 一次,异常的话直接存这个信息 + newWfInstance.setDag(JSON.toJSONString(dag)); if (dbNum < allJobIds.size()) { log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance); } else { + // 再次校验 DAG + boolean valid = WorkflowDAGUtils.valid(dag); + if (!valid) { + log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId); + onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance); + } else { + initNodeInfo(dag); + // 再 set 一次,此时工作流中的节点信息已经完全初始化 + newWfInstance.setDag(JSON.toJSONString(dag)); + } workflowInstanceInfoRepository.saveAndFlush(newWfInstance); } return wfInstanceId; @@ -145,28 +157,20 @@ public class WorkflowInstanceManager { } try { - - PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); + // 从实例中读取工作流信息 + PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class); List roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); - Map nodeId2JobInfoMap = Maps.newHashMap(); // 创建所有的根任务 roots.forEach(root -> { // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示 - // 如果 job 信息缺失,在 dispatch 的时候会失败,继而使整个工作流失败 - JobInfoDO jobInfo = jobInfoRepository.findById(root.getJobId()).orElseGet(JobInfoDO::new); - if (jobInfo.getId() == null) { - // 在创建工作流实例到当前的这段时间内 job 信息被物理删除了 - log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId()); - } - nodeId2JobInfoMap.put(root.getNodeId(), jobInfo); // instanceParam 传递的是工作流实例的 wfContext - Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), jobInfo.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); + Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), root.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); root.setInstanceId(instanceId); root.setStatus(InstanceStatus.RUNNING.getV()); - log.info("[Workflow-{}|{}] create root instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId,root.getNodeId(), root.getJobId(), instanceId); + log.info("[Workflow-{}|{}] create root instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId(), instanceId); }); // 持久化 @@ -176,7 +180,7 @@ public class WorkflowInstanceManager { log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - roots.forEach(root -> runInstance(nodeId2JobInfoMap.get(root.getNodeId()), root.getInstanceId())); + roots.forEach(this::runInstance); } catch (Exception e) { log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); @@ -226,19 +230,24 @@ public class WorkflowInstanceManager { // 更新完成节点状态 boolean allFinished = true; + PEWorkflowDAG.Node instanceNode = null; for (PEWorkflowDAG.Node node : dag.getNodes()) { if (instanceId.equals(node.getInstanceId())) { node.setStatus(status.getV()); node.setResult(result); - - log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId,node.getNodeId(), node.getJobId(), instanceId, status.name(), result); + instanceNode = node; + log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result); } - - if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { + if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { allFinished = false; } nodeId2Node.put(node.getNodeId(), node); } + if (instanceNode == null) { + // DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略 + log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId); + return; + } wfInstance.setGmtModified(new Date()); wfInstance.setDag(JSON.toJSONString(dag)); @@ -249,8 +258,8 @@ public class WorkflowInstanceManager { return; } - // 任务失败,DAG流程被打断,整体失败 - if (status == InstanceStatus.FAILED) { + // 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败 + if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode) ) { log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); return; @@ -285,28 +294,25 @@ public class WorkflowInstanceManager { // 重新计算需要派发的任务 List readyNodes = Lists.newArrayList(); - Map nodeId2JobInfoMap = Maps.newHashMap(); relyMap.keySet().forEach(nodeId -> { PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId); - // 跳过已完成节点(理论上此处不可能出现 FAILED 的情况)和已派发节点(存在 InstanceId) - if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV() || currentNode.getInstanceId() != null) { + // 跳过已完成节点(处理成功 或者 处理失败)和已派发节点(存在 InstanceId) + if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV() + || currentNode.getStatus() == InstanceStatus.FAILED.getV() + || currentNode.getInstanceId() != null) { return; } - // 判断某个任务所有依赖的完成情况,只要有一个未成功,即无法执行 - for (Long reliedJobId : relyMap.get(nodeId)) { - if (nodeId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) { + // 判断某个任务所有依赖的完成情况,只要有一个未完成,即无法执行 + for (Long reliedNodeId : relyMap.get(nodeId)) { + // 注意,这里允许失败的原因是有允许失败跳过节点的存在,对于不允许跳过的失败节点,一定走不到这里(工作流会被打断) + if (nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.SUCCEED.getV() + && nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.FAILED.getV()) { return; } } // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 - JobInfoDO jobInfo = jobInfoRepository.findById(currentNode.getJobId()).orElseGet(JobInfoDO::new); - if (jobInfo.getId() == null) { - // 在创建工作流实例到当前的这段时间内 job 信息被物理删除了 - log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId()); - } - nodeId2JobInfoMap.put(nodeId, jobInfo); // instanceParam 传递的是工作流实例的 wfContext - Long newInstanceId = instanceService.create(jobInfo.getId(), wfInstance.getAppId(), jobInfo.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); + Long newInstanceId = instanceService.create(currentNode.getJobId(), wfInstance.getAppId(), currentNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); currentNode.setInstanceId(newInstanceId); currentNode.setStatus(InstanceStatus.RUNNING.getV()); readyNodes.add(currentNode); @@ -316,7 +322,7 @@ public class WorkflowInstanceManager { wfInstance.setDag(JSON.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); // 持久化结束后,开始调度执行所有的任务 - readyNodes.forEach(node -> runInstance(nodeId2JobInfoMap.get(node.getNodeId()), node.getInstanceId())); + readyNodes.forEach(this::runInstance); } catch (Exception e) { onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); @@ -332,9 +338,10 @@ public class WorkflowInstanceManager { /** * 更新工作流上下文 - * @since 2021/02/05 + * * @param wfInstanceId 工作流实例 * @param appendedWfContextData 追加的上下文数据 + * @since 2021/02/05 */ @UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) public void updateWorkflowContext(Long wfInstanceId, Map appendedWfContextData) { @@ -362,17 +369,53 @@ public class WorkflowInstanceManager { } + /** + * 初始化节点信息 + * + * + * @param dag pe dag + * @since 20210205 + */ + private void initNodeInfo(PEWorkflowDAG dag) { + // 初始化节点信息(是否启用、是否允许失败跳过、节点参数) + for (PEWorkflowDAG.Node node : dag.getNodes()) { + Optional nodeInfoOpt = workflowNodeInfoRepository.findById(node.getNodeId()); + // 不考虑极端情况 + JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); + + if (!nodeInfoOpt.isPresent()) { + // 默认启用 + 不允许失败跳过 + node.setEnable(true); + node.setSkipWhenFailed(false); + node.setJobParams(jobInfo.getJobParams()); + } else { + + WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get(); + node.setEnable(nodeInfo.getEnable()); + node.setSkipWhenFailed(nodeInfo.getSkipWhenFailed()); + // 如果节点中指定了参数信息,则取节点的,否则取 Job 上的 + if (!StringUtils.isBlank(nodeInfo.getNodeParams())) { + node.setJobParams(nodeInfo.getNodeParams()); + } else { + node.setJobParams(jobInfo.getJobParams()); + } + + } + } + } + /** * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 * - * @param jobInfo 任务信息 - * @param instanceId 任务实例ID + * @param node 节点信息 */ - private void runInstance(JobInfoDO jobInfo, Long instanceId) { + private void runInstance(PEWorkflowDAG.Node node) { + + JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); // 洗去时间表达式类型 jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); - dispatchService.dispatch(jobInfo, instanceId); + dispatchService.dispatch(jobInfo, node.getInstanceId()); } private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) { @@ -400,4 +443,9 @@ public class WorkflowInstanceManager { // ignore } } + + private boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) { + // 默认不允许跳过 + return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed(); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index 3be885c4..7ca5763a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.service.workflow; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSON; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.SystemInstanceResult; @@ -46,10 +46,10 @@ public class WorkflowInstanceService { throw new PowerJobException("workflow instance already stopped"); } // 停止所有已启动且未完成的服务 - PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + PEWorkflowDAG workflowDAG = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> { try { - if (node.getInstanceId() != null && InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { + if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); node.setStatus(InstanceStatus.STOPPED.getV()); node.setResult(SystemInstanceResult.STOPPED_BY_USER); @@ -57,7 +57,7 @@ public class WorkflowInstanceService { instanceService.stopInstance(node.getInstanceId()); } }catch (Exception e) { - log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSONObject.toJSONString(node), e); + log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e); } }); diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java index ba5f8530..0e6306e0 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java @@ -106,7 +106,7 @@ public class RepositoryTest { @Test public void testDeleteInstanceInfo() { - instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.finishedStatus); + instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.FINISHED_STATUS); } @Test diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java index 0cd46e65..993ab002 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java @@ -1,7 +1,6 @@ package com.github.kfcfans.powerjob.samples.tester; import com.github.kfcfans.powerjob.common.WorkflowContextConstant; -import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; @@ -19,6 +18,8 @@ import java.util.Map; @Component public class AppendWorkflowContextTester implements BasicProcessor { + private static final String FAIL_CODE = "0"; + @Override @SuppressWarnings("squid:S106") @@ -29,6 +30,7 @@ public class AppendWorkflowContextTester implements BasicProcessor { System.out.println("======= AppendWorkflowContextTester#start ======="); System.out.println("current instance id : " + context.getInstanceId()); System.out.println("current workflow context : " + workflowContext); + System.out.println("current job param : " + context.getJobParams()); System.out.println("initParam of workflow context : " + originValue); int num = 0; try { @@ -38,6 +40,9 @@ public class AppendWorkflowContextTester implements BasicProcessor { } context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1); System.out.println("======= AppendWorkflowContextTester#end ======="); - return new ProcessResult(true, JsonUtils.toJSONString(context)); + if (FAIL_CODE.equals(context.getJobParams())) { + return new ProcessResult(false, "Failed!"); + } + return new ProcessResult(true, "Success!"); } }