diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index 9540cffe..60b38304 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -125,7 +125,7 @@ public class WorkflowInstanceManager { log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - runInstance(root.getJobId(), instanceId, wfInstanceId); + runInstance(root.getJobId(), instanceId, wfInstanceId, null); }catch (Exception e) { wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV()); @@ -193,6 +193,8 @@ public class WorkflowInstanceManager { // 重新计算需要派发的任务 Map jobId2InstanceId = Maps.newHashMap(); + Map jobId2InstanceParams = Maps.newHashMap(); + AtomicBoolean allFinished = new AtomicBoolean(true); relyMap.keySet().forEach(jobId -> { @@ -210,14 +212,15 @@ public class WorkflowInstanceManager { } // 所有依赖已经执行完毕,可以执行该任务 - Map jobId2Result = Maps.newHashMap(); - // 构建下一个任务的入参 (jobId -> result) - relyMap.get(jobId).forEach(jid -> jobId2Result.put(jid, jobId2Node.get(jid).getResult())); + Map preJobId2Result = Maps.newHashMap(); + // 构建下一个任务的入参 (前置任务 jobId -> result) + relyMap.get(jobId).forEach(jid -> preJobId2Result.put(jid, jobId2Node.get(jid).getResult())); - Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(jobId2Result), wfInstanceId, System.currentTimeMillis()); + Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis()); jobId2Node.get(jobId).setInstanceId(newInstanceId); jobId2InstanceId.put(jobId, newInstanceId); + jobId2InstanceParams.put(jobId, JsonUtils.toJSONString(preJobId2Result)); log.debug("[Workflow-{}] workflowInstance(wfInstanceId={}) start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId); }); @@ -233,7 +236,7 @@ public class WorkflowInstanceManager { workflowInstanceInfoRepository.saveAndFlush(wfInstance); // 持久化结束后,开始调度执行所有的任务 - jobId2InstanceId.forEach((jobId, newInstanceId) -> runInstance(jobId, newInstanceId, wfInstanceId)); + jobId2InstanceId.forEach((jobId, newInstanceId) -> runInstance(jobId, newInstanceId, wfInstanceId, jobId2InstanceParams.get(jobId))); }catch (Exception e) { wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); @@ -244,11 +247,19 @@ public class WorkflowInstanceManager { } } - private void runInstance(Long jobId, Long instanceId, Long wfInstanceId) { + /** + * 允许任务实例 + * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 + * @param jobId 任务ID + * @param instanceId 任务实例ID + * @param wfInstanceId 工作流任务实例ID + * @param instanceParams 任务实例参数,值为上游任务的执行结果: preJobId to preJobInstanceResult + */ + private void runInstance(Long jobId, Long instanceId, Long wfInstanceId, String instanceParams) { JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); // 洗去时间表达式类型 jobInfo.setTimeExpressionType(TimeExpressionType.API.getV()); - dispatchService.dispatch(jobInfo, instanceId, 0, null, wfInstanceId); + dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, wfInstanceId); } } diff --git a/oh-my-scheduler-server/src/main/resources/application.properties b/oh-my-scheduler-server/src/main/resources/application.properties index 0d050560..b068bee6 100644 --- a/oh-my-scheduler-server/src/main/resources/application.properties +++ b/oh-my-scheduler-server/src/main/resources/application.properties @@ -6,7 +6,7 @@ spring.jpa.open-in-view=false spring.data.mongodb.repositories.type=none # 文件上传配置 -spring.servlet.multipart.enabled =true +spring.servlet.multipart.enabled=true spring.servlet.multipart.file-size-threshold=0 spring.servlet.multipart.max-file-size=209715200 spring.servlet.multipart.max-request-size=209715200 diff --git a/oh-my-scheduler-server/src/main/resources/logback-dev.xml b/oh-my-scheduler-server/src/main/resources/logback-dev.xml index 071a2a86..6ecd910f 100644 --- a/oh-my-scheduler-server/src/main/resources/logback-dev.xml +++ b/oh-my-scheduler-server/src/main/resources/logback-dev.xml @@ -18,13 +18,12 @@ ${CONSOLE_LOG_PATTERN} utf8 - - debug - DENY - NEUTRAL - + + + + diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/processors/StandaloneProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/processors/StandaloneProcessorDemo.java index 3efe65de..18e8db81 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/processors/StandaloneProcessorDemo.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/samples/processors/StandaloneProcessorDemo.java @@ -36,6 +36,7 @@ public class StandaloneProcessorDemo implements BasicProcessor { } System.out.println("================ StandaloneProcessorDemo#process ================"); + System.out.println(context.getJobParams()); // 根据控制台参数判断是否成功 boolean success = !"failed".equals(context.getJobParams()); omsLogger.info("StandaloneProcessorDemo finished process,success: .", success);