diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/SwitchableStatus.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/SwitchableStatus.java index 26ebd74e..ca760539 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/SwitchableStatus.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/SwitchableStatus.java @@ -12,12 +12,14 @@ import lombok.Getter; @Getter @AllArgsConstructor public enum SwitchableStatus { - + /** + * + */ ENABLE(1), DISABLE(2), DELETED(99); - private int v; + private final int v; public static SwitchableStatus of(int v) { for (SwitchableStatus type : values()) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java index 9ef854ca..60cb83ba 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java @@ -34,11 +34,11 @@ public interface JobInfoRepository extends JpaRepository, JpaSp /** * 校验工作流包含的任务 * @param appId APP ID - * @param status 状态 + * @param statusSet 状态列表 * @param jobIds 任务ID * @return 数量 */ - long countByAppIdAndStatusAndIdIn(Long appId, int status, Set jobIds); + long countByAppIdAndStatusInAndIdIn(Long appId, Set statusSet , Set jobIds); long countByAppIdAndStatusNot(long appId, int status); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index a15140e5..1084c4b4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -119,7 +119,8 @@ public class WorkflowInstanceManager { node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); }); int needNum = allJobIds.size(); - long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds); + // 检查工作流中的任务是否均处于可用状态(没有被删除) + long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds); log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); // 先 set 一次,异常的话直接存这个信息 newWfInstance.setDag(JSON.toJSONString(dag)); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index 6a4ad32a..6d5fc14f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -58,14 +58,15 @@ public class WorkflowInstanceService { throw new PowerJobException("workflow instance already stopped"); } // 停止所有已启动且未完成的服务 - PEWorkflowDAG workflowDAG = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); - WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> { + PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + // 遍历所有节点,终止正在运行的 + dag.getNodes().forEach(node -> { try { if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); node.setStatus(InstanceStatus.STOPPED.getV()); node.setResult(SystemInstanceResult.STOPPED_BY_USER); - + // 注意,这里并不保证一定能终止正在运行的实例 instanceService.stopInstance(node.getInstanceId()); } } catch (Exception e) { @@ -74,6 +75,7 @@ public class WorkflowInstanceService { }); // 修改数据库状态 + wfInstance.setDag(JSON.toJSONString(dag)); wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER); wfInstance.setGmtModified(new Date()); @@ -97,14 +99,17 @@ public class WorkflowInstanceService { if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) { throw new PowerJobException("workflow instance is already successful"); } - // 因为 DAG 非法而终止的工作流实例无法重试 + // 因为 DAG 非法 或者 因任务信息缺失 而失败的工作流实例无法重试 + if (SystemInstanceResult.CAN_NOT_FIND_JOB.equals(wfInstance.getResult())) { + throw new PowerJobException("you can't retry the workflow instance which is missing job info!"); + } + // 校验 DAG 信息 PEWorkflowDAG dag = null; try { dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); if (!WorkflowDAGUtils.valid(dag)) { throw new PowerJobException(SystemInstanceResult.INVALID_DAG); } - } catch (Exception e) { throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!"); } @@ -113,10 +118,10 @@ public class WorkflowInstanceService { if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) { throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!"); } - // 将需要重试的节点状态重置(失败且不允许跳过的) + // 将需要重试的节点状态重置(失败且不允许跳过的 或者 手动终止的) for (PEWorkflowDAG.Node node : dag.getNodes()) { - if (node.getStatus() == InstanceStatus.FAILED.getV() - && isNotAllowSkipWhenFailed(node)) { + boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node); + if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) { node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()).setInstanceId(null); } }