fix: some problems when stop a workflow instance

This commit is contained in:
Echo009 2021-03-03 19:52:44 +08:00
parent 8f9e53ed83
commit 5f1ab82f0e
4 changed files with 21 additions and 13 deletions

View File

@ -12,12 +12,14 @@ import lombok.Getter;
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum SwitchableStatus { public enum SwitchableStatus {
/**
*
*/
ENABLE(1), ENABLE(1),
DISABLE(2), DISABLE(2),
DELETED(99); DELETED(99);
private int v; private final int v;
public static SwitchableStatus of(int v) { public static SwitchableStatus of(int v) {
for (SwitchableStatus type : values()) { for (SwitchableStatus type : values()) {

View File

@ -34,11 +34,11 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long>, JpaSp
/** /**
* 校验工作流包含的任务 * 校验工作流包含的任务
* @param appId APP ID * @param appId APP ID
* @param status 状态 * @param statusSet 状态列表
* @param jobIds 任务ID * @param jobIds 任务ID
* @return 数量 * @return 数量
*/ */
long countByAppIdAndStatusAndIdIn(Long appId, int status, Set<Long> jobIds); long countByAppIdAndStatusInAndIdIn(Long appId, Set<Integer> statusSet , Set<Long> jobIds);
long countByAppIdAndStatusNot(long appId, int status); long countByAppIdAndStatusNot(long appId, int status);

View File

@ -119,7 +119,8 @@ public class WorkflowInstanceManager {
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); node.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
}); });
int needNum = allJobIds.size(); int needNum = allJobIds.size();
long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds); // 检查工作流中的任务是否均处于可用状态没有被删除
long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds);
log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum);
// set 一次异常的话直接存这个信息 // set 一次异常的话直接存这个信息
newWfInstance.setDag(JSON.toJSONString(dag)); newWfInstance.setDag(JSON.toJSONString(dag));

View File

@ -58,14 +58,15 @@ public class WorkflowInstanceService {
throw new PowerJobException("workflow instance already stopped"); throw new PowerJobException("workflow instance already stopped");
} }
// 停止所有已启动且未完成的服务 // 停止所有已启动且未完成的服务
PEWorkflowDAG workflowDAG = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> { // 遍历所有节点终止正在运行的
dag.getNodes().forEach(node -> {
try { try {
if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
node.setStatus(InstanceStatus.STOPPED.getV()); node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER); node.setResult(SystemInstanceResult.STOPPED_BY_USER);
// 注意这里并不保证一定能终止正在运行的实例
instanceService.stopInstance(node.getInstanceId()); instanceService.stopInstance(node.getInstanceId());
} }
} catch (Exception e) { } catch (Exception e) {
@ -74,6 +75,7 @@ public class WorkflowInstanceService {
}); });
// 修改数据库状态 // 修改数据库状态
wfInstance.setDag(JSON.toJSONString(dag));
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER); wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER);
wfInstance.setGmtModified(new Date()); wfInstance.setGmtModified(new Date());
@ -97,14 +99,17 @@ public class WorkflowInstanceService {
if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) { if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) {
throw new PowerJobException("workflow instance is already successful"); throw new PowerJobException("workflow instance is already successful");
} }
// 因为 DAG 非法而终止的工作流实例无法重试 // 因为 DAG 非法 或者 因任务信息缺失 而失败的工作流实例无法重试
if (SystemInstanceResult.CAN_NOT_FIND_JOB.equals(wfInstance.getResult())) {
throw new PowerJobException("you can't retry the workflow instance which is missing job info!");
}
// 校验 DAG 信息
PEWorkflowDAG dag = null; PEWorkflowDAG dag = null;
try { try {
dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
if (!WorkflowDAGUtils.valid(dag)) { if (!WorkflowDAGUtils.valid(dag)) {
throw new PowerJobException(SystemInstanceResult.INVALID_DAG); throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
} }
} catch (Exception e) { } catch (Exception e) {
throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!"); throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!");
} }
@ -113,10 +118,10 @@ public class WorkflowInstanceService {
if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) { if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) {
throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!"); throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!");
} }
// 将需要重试的节点状态重置失败且不允许跳过的 // 将需要重试的节点状态重置失败且不允许跳过的 或者 手动终止的
for (PEWorkflowDAG.Node node : dag.getNodes()) { for (PEWorkflowDAG.Node node : dag.getNodes()) {
if (node.getStatus() == InstanceStatus.FAILED.getV() boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node);
&& isNotAllowSkipWhenFailed(node)) { if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) {
node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null); node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null);
} }
} }