feat: 工作流节点支持失败跳过

This commit is contained in:
Echo009 2021-02-18 15:15:06 +08:00
parent 11b712332d
commit 4ee6300c6a
12 changed files with 164 additions and 73 deletions

View File

@ -15,7 +15,9 @@ import java.util.List;
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum InstanceStatus { public enum InstanceStatus {
/**
*
*/
WAITING_DISPATCH(1, "等待派发"), WAITING_DISPATCH(1, "等待派发"),
WAITING_WORKER_RECEIVE(2, "等待Worker接收"), WAITING_WORKER_RECEIVE(2, "等待Worker接收"),
RUNNING(3, "运行中"), RUNNING(3, "运行中"),
@ -27,10 +29,14 @@ public enum InstanceStatus {
private final int v; private final int v;
private final String des; private final String des;
// 广义的运行状态 /**
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); * 广义的运行状态
// 结束状态 */
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v); public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
/**
* 结束状态
*/
public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v);
public static InstanceStatus of(int v) { public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) { for (InstanceStatus is : values()) {

View File

@ -8,30 +8,54 @@ package com.github.kfcfans.powerjob.common;
*/ */
public class SystemInstanceResult { public class SystemInstanceResult {
private SystemInstanceResult() {
}
/* *********** 普通instance 专用 *********** */ /* *********** 普通instance 专用 *********** */
// 同时运行的任务实例数过多 /**
* 同时运行的任务实例数过多
*/
public static final String TOO_MANY_INSTANCES = "too many instances(%d>%d)"; public static final String TOO_MANY_INSTANCES = "too many instances(%d>%d)";
// 无可用worker /**
* 无可用worker
*/
public static final String NO_WORKER_AVAILABLE = "no worker available"; public static final String NO_WORKER_AVAILABLE = "no worker available";
// 任务执行超时 /**
* 任务执行超时
*/
public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout"; public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout";
// 创建根任务失败 /**
* 创建根任务失败
*/
public static final String TASK_INIT_FAILED = "create root task failed"; public static final String TASK_INIT_FAILED = "create root task failed";
// 未知错误 /**
* 未知错误
*/
public static final String UNKNOWN_BUG = "unknown bug"; public static final String UNKNOWN_BUG = "unknown bug";
// TaskTracker 长时间未上报 /**
* TaskTracker 长时间未上报
*/
public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down"; public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down";
public static final String CAN_NOT_FIND_JOB_INFO = "can't find job info"; public static final String CAN_NOT_FIND_JOB_INFO = "can't find job info";
/* *********** workflow 专用 *********** */ /* *********** workflow 专用 *********** */
public static final String MIDDLE_JOB_FAILED = "middle job failed"; public static final String MIDDLE_JOB_FAILED = "middle job failed";
public static final String MIDDLE_JOB_STOPPED = "middle job stopped by user"; public static final String MIDDLE_JOB_STOPPED = "middle job stopped by user";
public static final String CAN_NOT_FIND_JOB = "can't find some job"; public static final String CAN_NOT_FIND_JOB = "can't find some job";
// 被用户手动停止 /**
* 被用户手动停止
*/
public static final String STOPPED_BY_USER = "stopped by user"; public static final String STOPPED_BY_USER = "stopped by user";
public static final String CANCELED_BY_USER = "canceled by user"; public static final String CANCELED_BY_USER = "canceled by user";
/**
* 无效 DAG
*/
public static final String INVALID_DAG = "invalid dag";
} }

View File

@ -73,7 +73,7 @@ public class WorkerRequestHandler {
instanceManager.updateStatus(req); instanceManager.updateStatus(req);
// 结束状态成功/失败需要回复消息 // 结束状态成功/失败需要回复消息
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
return Optional.of(AskResponse.succeed(null)); return Optional.of(AskResponse.succeed(null));
} }
return Optional.empty(); return Optional.empty();

View File

@ -51,7 +51,13 @@ public class DispatchService {
private static final Splitter COMMA_SPLITTER = Splitter.on(","); private static final Splitter COMMA_SPLITTER = Splitter.on(",");
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) /**
* 重新派发任务
*
* @param jobInfo 任务信息注意这里传入的任务信息有可能为
* @param instanceId 实例ID
*/
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId() ?: 0", concurrencyLevel = 1024)
public void redispatch(JobInfoDO jobInfo, long instanceId) { public void redispatch(JobInfoDO jobInfo, long instanceId) {
// 这里暂时保留 // 这里暂时保留
dispatch(jobInfo, instanceId); dispatch(jobInfo, instanceId);
@ -67,10 +73,10 @@ public class DispatchService {
* 迁移至 {@link InstanceManager#updateStatus} 中处理 * 迁移至 {@link InstanceManager#updateStatus} 中处理
* ************************************************** * **************************************************
* *
* @param jobInfo 任务的元信息注意这里传入的 jobInfo 可能为空对象 * @param jobInfo 任务的元信息
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
*/ */
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId() ?: 0", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, long instanceId) { public void dispatch(JobInfoDO jobInfo, long instanceId) {
// 检查当前任务是否被取消 // 检查当前任务是否被取消
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);

View File

@ -185,7 +185,7 @@ public class JobService {
if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) { if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) {
return; return;
} }
List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS);
if (CollectionUtils.isEmpty(executeLogs)) { if (CollectionUtils.isEmpty(executeLogs)) {
return; return;
} }

View File

@ -107,7 +107,7 @@ public class InstanceService {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
// 判断状态只有运行中才能停止 // 判断状态只有运行中才能停止
if (!InstanceStatus.generalizedRunningStatus.contains(instanceInfo.getStatus())) { if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(instanceInfo.getStatus())) {
throw new IllegalArgumentException("can't stop finished instance!"); throw new IllegalArgumentException("can't stop finished instance!");
} }
@ -153,7 +153,7 @@ public class InstanceService {
log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId); log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) { if (!InstanceStatus.FINISHED_STATUS.contains(instanceInfo.getStatus())) {
throw new PowerJobException("Only stopped instance can be retry!"); throw new PowerJobException("Only stopped instance can be retry!");
} }
// 暂时不支持工作流任务的重试 // 暂时不支持工作流任务的重试

View File

@ -51,7 +51,9 @@ public class CleanService {
private static final int TEMPORARY_RETENTION_DAY = 3; private static final int TEMPORARY_RETENTION_DAY = 3;
// 每天凌晨3点定时清理 /**
* 每天凌晨3点定时清理
*/
private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?";
private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
@ -152,7 +154,7 @@ public class CleanService {
} }
try { try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay); Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.finishedStatus); int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.FINISHED_STATUS);
log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t); log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) { }catch (Exception e) {
log.warn("[CleanService] clean instanceInfo failed.", e); log.warn("[CleanService] clean instanceInfo failed.", e);

View File

@ -228,7 +228,7 @@ public class OmsScheduleService {
return; return;
} }
// 查询日志记录表中是否存在相关的任务 // 查询日志记录表中是否存在相关的任务
List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus); List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.GENERALIZED_RUNNING_STATUS);
Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList); Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);
List<Long> notRunningJobIds = Lists.newLinkedList(); List<Long> notRunningJobIds = Lists.newLinkedList();

View File

@ -8,13 +8,11 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.*;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowNodeInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.UserService; import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
@ -24,6 +22,7 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock; import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock;
import com.google.common.collect.*; import com.google.common.collect.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -55,6 +54,8 @@ public class WorkflowInstanceManager {
private WorkflowInfoRepository workflowInfoRepository; private WorkflowInfoRepository workflowInfoRepository;
@Resource @Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
private final SegmentLock segmentLock = new SegmentLock(16); private final SegmentLock segmentLock = new SegmentLock(16);
@ -75,7 +76,7 @@ public class WorkflowInstanceManager {
Long wfId = wfInfo.getId(); Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate(); Long wfInstanceId = idGenerateService.allocate();
// 仅创建不写入 DAG 信息 // 创建 并初始化 DAG 信息
Date now = new Date(); Date now = new Date();
WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO(); WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO();
newWfInstance.setAppId(wfInfo.getAppId()); newWfInstance.setAppId(wfInfo.getAppId());
@ -100,11 +101,22 @@ public class WorkflowInstanceManager {
int needNum = allJobIds.size(); int needNum = allJobIds.size();
long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds); long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds);
log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum);
// set 一次异常的话直接存这个信息
newWfInstance.setDag(JSON.toJSONString(dag));
if (dbNum < allJobIds.size()) { if (dbNum < allJobIds.size()) {
log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum);
onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance); onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance);
} else { } else {
// 再次校验 DAG
boolean valid = WorkflowDAGUtils.valid(dag);
if (!valid) {
log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId);
onWorkflowInstanceFailed(SystemInstanceResult.INVALID_DAG, newWfInstance);
} else {
initNodeInfo(dag);
// set 一次此时工作流中的节点信息已经完全初始化
newWfInstance.setDag(JSON.toJSONString(dag));
}
workflowInstanceInfoRepository.saveAndFlush(newWfInstance); workflowInstanceInfoRepository.saveAndFlush(newWfInstance);
} }
return wfInstanceId; return wfInstanceId;
@ -145,24 +157,16 @@ public class WorkflowInstanceManager {
} }
try { try {
// 从实例中读取工作流信息
PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class);
List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG);
peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()));
Map<Long, JobInfoDO> nodeId2JobInfoMap = Maps.newHashMap();
// 创建所有的根任务 // 创建所有的根任务
roots.forEach(root -> { roots.forEach(root -> {
// 注意这里必须保证任务实例全部创建成功如果在这里创建实例部分失败会导致 DAG 信息不会更新已经生成的实例节点在工作流日志中没法展示 // 注意这里必须保证任务实例全部创建成功如果在这里创建实例部分失败会导致 DAG 信息不会更新已经生成的实例节点在工作流日志中没法展示
// 如果 job 信息缺失 dispatch 的时候会失败继而使整个工作流失败
JobInfoDO jobInfo = jobInfoRepository.findById(root.getJobId()).orElseGet(JobInfoDO::new);
if (jobInfo.getId() == null) {
// 在创建工作流实例到当前的这段时间内 job 信息被物理删除了
log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId());
}
nodeId2JobInfoMap.put(root.getNodeId(), jobInfo);
// instanceParam 传递的是工作流实例的 wfContext // instanceParam 传递的是工作流实例的 wfContext
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), jobInfo.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), root.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId); root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV()); root.setStatus(InstanceStatus.RUNNING.getV());
@ -176,7 +180,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务 // 真正开始执行根任务
roots.forEach(root -> runInstance(nodeId2JobInfoMap.get(root.getNodeId()), root.getInstanceId())); roots.forEach(this::runInstance);
} catch (Exception e) { } catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
@ -226,19 +230,24 @@ public class WorkflowInstanceManager {
// 更新完成节点状态 // 更新完成节点状态
boolean allFinished = true; boolean allFinished = true;
PEWorkflowDAG.Node instanceNode = null;
for (PEWorkflowDAG.Node node : dag.getNodes()) { for (PEWorkflowDAG.Node node : dag.getNodes()) {
if (instanceId.equals(node.getInstanceId())) { if (instanceId.equals(node.getInstanceId())) {
node.setStatus(status.getV()); node.setStatus(status.getV());
node.setResult(result); node.setResult(result);
instanceNode = node;
log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result); log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result);
} }
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) {
allFinished = false; allFinished = false;
} }
nodeId2Node.put(node.getNodeId(), node); nodeId2Node.put(node.getNodeId(), node);
} }
if (instanceNode == null) {
// DAG 中的节点实例已经被覆盖原地重试生成了新的实例信息直接忽略
log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId);
return;
}
wfInstance.setGmtModified(new Date()); wfInstance.setGmtModified(new Date());
wfInstance.setDag(JSON.toJSONString(dag)); wfInstance.setDag(JSON.toJSONString(dag));
@ -249,8 +258,8 @@ public class WorkflowInstanceManager {
return; return;
} }
// 任务失败DAG流程被打断整体失败 // 任务失败 && 不允许失败跳过DAG流程被打断整体失败
if (status == InstanceStatus.FAILED) { if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode) ) {
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance);
return; return;
@ -285,28 +294,25 @@ public class WorkflowInstanceManager {
// 重新计算需要派发的任务 // 重新计算需要派发的任务
List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList(); List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
Map<Long, JobInfoDO> nodeId2JobInfoMap = Maps.newHashMap();
relyMap.keySet().forEach(nodeId -> { relyMap.keySet().forEach(nodeId -> {
PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId); PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId);
// 跳过已完成节点理论上此处不可能出现 FAILED 的情况和已派发节点存在 InstanceId // 跳过已完成节点处理成功 或者 处理失败和已派发节点存在 InstanceId
if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV() || currentNode.getInstanceId() != null) { if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV()
|| currentNode.getStatus() == InstanceStatus.FAILED.getV()
|| currentNode.getInstanceId() != null) {
return; return;
} }
// 判断某个任务所有依赖的完成情况只要有一个未成功即无法执行 // 判断某个任务所有依赖的完成情况只要有一个未完成即无法执行
for (Long reliedJobId : relyMap.get(nodeId)) { for (Long reliedNodeId : relyMap.get(nodeId)) {
if (nodeId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) { // 注意这里允许失败的原因是有允许失败跳过节点的存在对于不允许跳过的失败节点一定走不到这里工作流会被打断
if (nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.SUCCEED.getV()
&& nodeId2Node.get(reliedNodeId).getStatus() != InstanceStatus.FAILED.getV()) {
return; return;
} }
} }
// 同理这里必须保证任务实例全部创建成功避免部分失败导致已经生成的实例节点在工作流日志中没法展示 // 同理这里必须保证任务实例全部创建成功避免部分失败导致已经生成的实例节点在工作流日志中没法展示
JobInfoDO jobInfo = jobInfoRepository.findById(currentNode.getJobId()).orElseGet(JobInfoDO::new);
if (jobInfo.getId() == null) {
// 在创建工作流实例到当前的这段时间内 job 信息被物理删除了
log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId());
}
nodeId2JobInfoMap.put(nodeId, jobInfo);
// instanceParam 传递的是工作流实例的 wfContext // instanceParam 传递的是工作流实例的 wfContext
Long newInstanceId = instanceService.create(jobInfo.getId(), wfInstance.getAppId(), jobInfo.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); Long newInstanceId = instanceService.create(currentNode.getJobId(), wfInstance.getAppId(), currentNode.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
currentNode.setInstanceId(newInstanceId); currentNode.setInstanceId(newInstanceId);
currentNode.setStatus(InstanceStatus.RUNNING.getV()); currentNode.setStatus(InstanceStatus.RUNNING.getV());
readyNodes.add(currentNode); readyNodes.add(currentNode);
@ -316,7 +322,7 @@ public class WorkflowInstanceManager {
wfInstance.setDag(JSON.toJSONString(dag)); wfInstance.setDag(JSON.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstance); workflowInstanceInfoRepository.saveAndFlush(wfInstance);
// 持久化结束后开始调度执行所有的任务 // 持久化结束后开始调度执行所有的任务
readyNodes.forEach(node -> runInstance(nodeId2JobInfoMap.get(node.getNodeId()), node.getInstanceId())); readyNodes.forEach(this::runInstance);
} catch (Exception e) { } catch (Exception e) {
onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance);
@ -332,9 +338,10 @@ public class WorkflowInstanceManager {
/** /**
* 更新工作流上下文 * 更新工作流上下文
* @since 2021/02/05 *
* @param wfInstanceId 工作流实例 * @param wfInstanceId 工作流实例
* @param appendedWfContextData 追加的上下文数据 * @param appendedWfContextData 追加的上下文数据
* @since 2021/02/05
*/ */
@UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) @UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) { public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) {
@ -362,17 +369,53 @@ public class WorkflowInstanceManager {
} }
/**
* 初始化节点信息
*
*
* @param dag pe dag
* @since 20210205
*/
private void initNodeInfo(PEWorkflowDAG dag) {
// 初始化节点信息是否启用是否允许失败跳过节点参数
for (PEWorkflowDAG.Node node : dag.getNodes()) {
Optional<WorkflowNodeInfoDO> nodeInfoOpt = workflowNodeInfoRepository.findById(node.getNodeId());
// 不考虑极端情况
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
if (!nodeInfoOpt.isPresent()) {
// 默认启用 + 不允许失败跳过
node.setEnable(true);
node.setSkipWhenFailed(false);
node.setJobParams(jobInfo.getJobParams());
} else {
WorkflowNodeInfoDO nodeInfo = nodeInfoOpt.get();
node.setEnable(nodeInfo.getEnable());
node.setSkipWhenFailed(nodeInfo.getSkipWhenFailed());
// 如果节点中指定了参数信息则取节点的否则取 Job 上的
if (!StringUtils.isBlank(nodeInfo.getNodeParams())) {
node.setJobParams(nodeInfo.getNodeParams());
} else {
node.setJobParams(jobInfo.getJobParams());
}
}
}
}
/** /**
* 运行任务实例 * 运行任务实例
* 需要将创建和运行任务实例分离否则在秒失败情况下会发生DAG覆盖更新的问题 * 需要将创建和运行任务实例分离否则在秒失败情况下会发生DAG覆盖更新的问题
* *
* @param jobInfo 任务信息 * @param node 节点信息
* @param instanceId 任务实例ID
*/ */
private void runInstance(JobInfoDO jobInfo, Long instanceId) { private void runInstance(PEWorkflowDAG.Node node) {
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
// 洗去时间表达式类型 // 洗去时间表达式类型
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
dispatchService.dispatch(jobInfo, instanceId); dispatchService.dispatch(jobInfo, node.getInstanceId());
} }
private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) { private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) {
@ -400,4 +443,9 @@ public class WorkflowInstanceManager {
// ignore // ignore
} }
} }
private boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) {
// 默认不允许跳过
return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed();
}
} }

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.service.workflow; package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.SystemInstanceResult;
@ -46,10 +46,10 @@ public class WorkflowInstanceService {
throw new PowerJobException("workflow instance already stopped"); throw new PowerJobException("workflow instance already stopped");
} }
// 停止所有已启动且未完成的服务 // 停止所有已启动且未完成的服务
PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); PEWorkflowDAG workflowDAG = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> { WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> {
try { try {
if (node.getInstanceId() != null && InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
node.setStatus(InstanceStatus.STOPPED.getV()); node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER); node.setResult(SystemInstanceResult.STOPPED_BY_USER);
@ -57,7 +57,7 @@ public class WorkflowInstanceService {
instanceService.stopInstance(node.getInstanceId()); instanceService.stopInstance(node.getInstanceId());
} }
}catch (Exception e) { }catch (Exception e) {
log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSONObject.toJSONString(node), e); log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e);
} }
}); });

View File

@ -106,7 +106,7 @@ public class RepositoryTest {
@Test @Test
public void testDeleteInstanceInfo() { public void testDeleteInstanceInfo() {
instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.finishedStatus); instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.FINISHED_STATUS);
} }
@Test @Test

View File

@ -1,7 +1,6 @@
package com.github.kfcfans.powerjob.samples.tester; package com.github.kfcfans.powerjob.samples.tester;
import com.github.kfcfans.powerjob.common.WorkflowContextConstant; import com.github.kfcfans.powerjob.common.WorkflowContextConstant;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
@ -19,6 +18,8 @@ import java.util.Map;
@Component @Component
public class AppendWorkflowContextTester implements BasicProcessor { public class AppendWorkflowContextTester implements BasicProcessor {
private static final String FAIL_CODE = "0";
@Override @Override
@SuppressWarnings("squid:S106") @SuppressWarnings("squid:S106")
@ -29,6 +30,7 @@ public class AppendWorkflowContextTester implements BasicProcessor {
System.out.println("======= AppendWorkflowContextTester#start ======="); System.out.println("======= AppendWorkflowContextTester#start =======");
System.out.println("current instance id : " + context.getInstanceId()); System.out.println("current instance id : " + context.getInstanceId());
System.out.println("current workflow context : " + workflowContext); System.out.println("current workflow context : " + workflowContext);
System.out.println("current job param : " + context.getJobParams());
System.out.println("initParam of workflow context : " + originValue); System.out.println("initParam of workflow context : " + originValue);
int num = 0; int num = 0;
try { try {
@ -38,6 +40,9 @@ public class AppendWorkflowContextTester implements BasicProcessor {
} }
context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1); context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1);
System.out.println("======= AppendWorkflowContextTester#end ======="); System.out.println("======= AppendWorkflowContextTester#end =======");
return new ProcessResult(true, JsonUtils.toJSONString(context)); if (FAIL_CODE.equals(context.getJobParams())) {
return new ProcessResult(false, "Failed!");
}
return new ProcessResult(true, "Success!");
} }
} }