[fix] fix the workflow result transfer fail bug

This commit is contained in:
tjq 2020-05-27 23:22:53 +08:00
parent 4c5156ec83
commit 6fd685cdfc
4 changed files with 25 additions and 14 deletions

View File

@ -125,7 +125,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务
runInstance(root.getJobId(), instanceId, wfInstanceId);
runInstance(root.getJobId(), instanceId, wfInstanceId, null);
}catch (Exception e) {
wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV());
@ -193,6 +193,8 @@ public class WorkflowInstanceManager {
// 重新计算需要派发的任务
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
Map<Long, String> jobId2InstanceParams = Maps.newHashMap();
AtomicBoolean allFinished = new AtomicBoolean(true);
relyMap.keySet().forEach(jobId -> {
@ -210,14 +212,15 @@ public class WorkflowInstanceManager {
}
// 所有依赖已经执行完毕可以执行该任务
Map<Long, String> jobId2Result = Maps.newHashMap();
// 构建下一个任务的入参 jobId -> result
relyMap.get(jobId).forEach(jid -> jobId2Result.put(jid, jobId2Node.get(jid).getResult()));
Map<Long, String> preJobId2Result = Maps.newHashMap();
// 构建下一个任务的入参 前置任务 jobId -> result
relyMap.get(jobId).forEach(jid -> preJobId2Result.put(jid, jobId2Node.get(jid).getResult()));
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(jobId2Result), wfInstanceId, System.currentTimeMillis());
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis());
jobId2Node.get(jobId).setInstanceId(newInstanceId);
jobId2InstanceId.put(jobId, newInstanceId);
jobId2InstanceParams.put(jobId, JsonUtils.toJSONString(preJobId2Result));
log.debug("[Workflow-{}] workflowInstance(wfInstanceId={}) start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId);
});
@ -233,7 +236,7 @@ public class WorkflowInstanceManager {
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
// 持久化结束后开始调度执行所有的任务
jobId2InstanceId.forEach((jobId, newInstanceId) -> runInstance(jobId, newInstanceId, wfInstanceId));
jobId2InstanceId.forEach((jobId, newInstanceId) -> runInstance(jobId, newInstanceId, wfInstanceId, jobId2InstanceParams.get(jobId)));
}catch (Exception e) {
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
@ -244,11 +247,19 @@ public class WorkflowInstanceManager {
}
}
private void runInstance(Long jobId, Long instanceId, Long wfInstanceId) {
/**
* 允许任务实例
* 需要将创建和运行任务实例分离否则在秒失败情况下会发生DAG覆盖更新的问题
* @param jobId 任务ID
* @param instanceId 任务实例ID
* @param wfInstanceId 工作流任务实例ID
* @param instanceParams 任务实例参数值为上游任务的执行结果 preJobId to preJobInstanceResult
*/
private void runInstance(Long jobId, Long instanceId, Long wfInstanceId, String instanceParams) {
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
// 洗去时间表达式类型
jobInfo.setTimeExpressionType(TimeExpressionType.API.getV());
dispatchService.dispatch(jobInfo, instanceId, 0, null, wfInstanceId);
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, wfInstanceId);
}
}

View File

@ -6,7 +6,7 @@ spring.jpa.open-in-view=false
spring.data.mongodb.repositories.type=none
# 文件上传配置
spring.servlet.multipart.enabled =true
spring.servlet.multipart.enabled=true
spring.servlet.multipart.file-size-threshold=0
spring.servlet.multipart.max-file-size=209715200
spring.servlet.multipart.max-request-size=209715200

View File

@ -18,13 +18,12 @@
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>DENY</onMatch>
<onMismatch>NEUTRAL</onMismatch>
</filter>
</appender>
<logger name="com.github.kfcfans.oms" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE" />
</logger>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>

View File

@ -36,6 +36,7 @@ public class StandaloneProcessorDemo implements BasicProcessor {
}
System.out.println("================ StandaloneProcessorDemo#process ================");
System.out.println(context.getJobParams());
// 根据控制台参数判断是否成功
boolean success = !"failed".equals(context.getJobParams());
omsLogger.info("StandaloneProcessorDemo finished process,success: .", success);