diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowContextConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowContextConstant.java new file mode 100644 index 00000000..8597d119 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowContextConstant.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.powerjob.common; + +/** + * 工作流上下文相关常量 + * + * @author Echo009 + * @since 2021/2/3 + */ +public final class WorkflowContextConstant { + + /** + * 上下文初始参数 + */ + public static final String CONTEXT_INIT_PARAMS_KEY = "initParams"; + + + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java index d1504db8..6ac7d823 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/WorkflowDAGUtils.java @@ -21,6 +21,9 @@ import java.util.Set; */ public class WorkflowDAGUtils { + private WorkflowDAGUtils() { + + } /** * 获取所有根节点 * @@ -29,11 +32,11 @@ public class WorkflowDAGUtils { */ public static List listRoots(PEWorkflowDAG peWorkflowDAG) { - Map jobId2Node = Maps.newHashMap(); - peWorkflowDAG.getNodes().forEach(node -> jobId2Node.put(node.getJobId(), node)); - peWorkflowDAG.getEdges().forEach(edge -> jobId2Node.remove(edge.getTo())); + Map nodeId2Node = Maps.newHashMap(); + peWorkflowDAG.getNodes().forEach(node -> nodeId2Node.put(node.getNodeId(), node)); + peWorkflowDAG.getEdges().forEach(edge -> nodeId2Node.remove(edge.getTo())); - return Lists.newLinkedList(jobId2Node.values()); + return Lists.newLinkedList(nodeId2Node.values()); } /** @@ -44,13 +47,13 @@ public class WorkflowDAGUtils { */ public static boolean valid(PEWorkflowDAG peWorkflowDAG) { - // 点不允许重复,一个工作流中某个任务只允许出现一次 - Set jobIds = Sets.newHashSet(); + // 节点ID + Set nodeIds = Sets.newHashSet(); for (PEWorkflowDAG.Node n : peWorkflowDAG.getNodes()) { - if (jobIds.contains(n.getJobId())) { + if (nodeIds.contains(n.getNodeId())) { return false; } - jobIds.add(n.getJobId()); + nodeIds.add(n.getNodeId()); } try { @@ -64,6 +67,7 @@ public class WorkflowDAGUtils { } return true; } catch (Exception ignore) { + // ignore } return false; } @@ -71,29 +75,29 @@ public class WorkflowDAGUtils { /** * 将点线表示法的DAG图转化为引用表达法的DAG图 * - * @param PEWorkflowDAG 点线表示法的DAG图 + * @param peWorkflowDAG 点线表示法的DAG图 * @return 引用表示法的DAG图 */ - public static WorkflowDAG convert(PEWorkflowDAG PEWorkflowDAG) { + public static WorkflowDAG convert(PEWorkflowDAG peWorkflowDAG) { Set rootIds = Sets.newHashSet(); Map id2Node = Maps.newHashMap(); - if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) { + if (peWorkflowDAG.getNodes() == null || peWorkflowDAG.getNodes().isEmpty()) { throw new PowerJobException("empty graph"); } // 创建节点 - PEWorkflowDAG.getNodes().forEach(node -> { - Long jobId = node.getJobId(); - WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), jobId, node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV()); - id2Node.put(jobId, n); + peWorkflowDAG.getNodes().forEach(node -> { + Long nodeId = node.getNodeId(); + WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV()); + id2Node.put(nodeId, n); // 初始阶段,每一个点都设为顶点 - rootIds.add(jobId); + rootIds.add(nodeId); }); // 连接图像 - PEWorkflowDAG.getEdges().forEach(edge -> { + peWorkflowDAG.getEdges().forEach(edge -> { WorkflowDAG.Node from = id2Node.get(edge.getFrom()); WorkflowDAG.Node to = id2Node.get(edge.getTo()); @@ -104,12 +108,12 @@ public class WorkflowDAGUtils { from.getSuccessors().add(to); // 被连接的点不可能成为 root,移除 - rootIds.remove(to.getJobId()); + rootIds.remove(to.getNodeId()); }); // 合法性校验(至少存在一个顶点) - if (rootIds.size() < 1) { - throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG)); + if (rootIds.isEmpty()) { + throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(peWorkflowDAG)); } List roots = Lists.newLinkedList(); @@ -121,13 +125,13 @@ public class WorkflowDAGUtils { private static boolean invalidPath(WorkflowDAG.Node root, Set ids) { // 递归出口(出现之前的节点则代表有环,失败;出现无后继者节点,则说明该路径成功) - if (ids.contains(root.getJobId())) { + if (ids.contains(root.getNodeId())) { return true; } if (root.getSuccessors().isEmpty()) { return false; } - ids.add(root.getJobId()); + ids.add(root.getNodeId()); for (WorkflowDAG.Node node : root.getSuccessors()) { if (invalidPath(node, Sets.newHashSet(ids))) { return true; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index 061dd1de..a23e0a01 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -38,6 +38,13 @@ public class InstanceInfoDO { * 任务所属应用的ID,冗余提高查询效率 */ private Long instanceId; + /** + * 任务参数(静态) + * @since 2021/2/01 + */ + @Lob + @Column + private String jobParams; /** * 任务实例参数(动态) */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java index 0872cbab..9def5894 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java @@ -6,6 +6,7 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; import javax.transaction.Transactional; import java.util.Date; @@ -29,49 +30,83 @@ public interface InstanceInfoRepository extends JpaRepository findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List jobIds, int status, long time); + List findByAppIdInAndStatusAndActualTriggerTimeLessThan(List jobIds, int status, long time); + List findByAppIdInAndStatusAndGmtModifiedBefore(List jobIds, int status, Date time); + InstanceInfoDO findByInstanceId(long instanceId); - // 数据统计 + /* --数据统计-- */ + long countByAppIdAndStatus(long appId, int status); + long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time); @Query(value = "select distinct jobId from InstanceInfoDO where jobId in ?1 and status in ?2") List findByJobIdInAndStatusIn(List jobIds, List status); - // 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... - // 结果只能用 int 接收 + /** + * 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... + * 结果只能用 int 接收 + * + * @param time 更新时间阈值 + * @param status 状态 + * @return 删除记录数 + */ @Modifying - @Transactional + @Transactional(rollbackOn = Exception.class) @Query(value = "delete from InstanceInfoDO where gmtModified < ?1 and status in ?2") int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List status); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java index 146b3711..9ef854ca 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java @@ -8,6 +8,7 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.Query; import java.util.List; +import java.util.Set; /** * JobInfo 数据访问层 @@ -18,7 +19,9 @@ import java.util.List; public interface JobInfoRepository extends JpaRepository, JpaSpecificationExecutor { - // 调度专用 + /** + * 调度专用 + */ List findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); @Query(value = "select id from JobInfoDO where appId in ?1 and status = ?2 and timeExpressionType in ?3") @@ -28,8 +31,14 @@ public interface JobInfoRepository extends JpaRepository, JpaSp Page findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable); - // 校验工作流包含的任务 - long countByAppIdAndStatusAndIdIn(Long appId, int status, List jobIds); + /** + * 校验工作流包含的任务 + * @param appId APP ID + * @param status 状态 + * @param jobIds 任务ID + * @return 数量 + */ + long countByAppIdAndStatusAndIdIn(Long appId, int status, Set jobIds); long countByAppIdAndStatusNot(long appId, int status); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index a33e2982..0bef44d1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -49,40 +49,49 @@ public class DispatchService { @Resource private InstanceInfoRepository instanceInfoRepository; - private static final Splitter commaSplitter = Splitter.on(","); + private static final Splitter COMMA_SPLITTER = Splitter.on(","); @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) - public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { - InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId()); + public void redispatch(JobInfoDO jobInfo, long instanceId) { + // 这里暂时保留 + dispatch(jobInfo, instanceId); } /** * 将任务从Server派发到Worker(TaskTracker) - * @param jobInfo 任务的元信息 + * ************************************************** + * 2021-02-03 modify by Echo009 + * 1、移除参数 当前运行次数、工作流实例ID、实例参数 + * 更改为从当前任务实例中获取获取以上信息 + * 2、移除运行次数相关的(runningTimes)处理逻辑 + * 迁移至 {@link InstanceManager#updateStatus} 中处理 + * ************************************************** + * + * @param jobInfo 任务的元信息,注意这里传入的 jobInfo 可能为空对象 * @param instanceId 任务实例ID - * @param currentRunningTimes 当前运行的次数 - * @param instanceParams 实例的运行参数,API触发方式专用 - * @param wfInstanceId 工作流任务实例ID,workflow 任务专用 */ @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) - public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) { - Long jobId = jobInfo.getId(); - log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams); - - Date now = new Date(); - String dbInstanceParams = instanceParams == null ? "" : instanceParams; - + public void dispatch(JobInfoDO jobInfo, long instanceId) { // 检查当前任务是否被取消 InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - if ( CANCELED.getV() == instanceInfo.getStatus()) { + Long jobId = instanceInfo.getId(); + if (CANCELED.getV() == instanceInfo.getStatus()) { log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, instanceId); return; } + // 任务信息已经被删除 + if (jobInfo.getId() == null) { + log.warn("[Dispatcher-{}|{}] cancel dispatch due to job(id={}) has been deleted!", jobId, instanceId, jobId); + instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, "can't find job by id " + jobId); + return; + } + + Date now = new Date(); + String dbInstanceParams = instanceInfo.getInstanceParams() == null ? "" : instanceInfo.getInstanceParams(); + log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, dbInstanceParams); // 查询当前运行的实例数 long current = System.currentTimeMillis(); - Integer maxInstanceNum = jobInfo.getMaxInstanceNum(); // 秒级任务只派发到一台机器,具体的 maxInstanceNum 由 TaskTracker 控制 if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { @@ -91,7 +100,6 @@ public class DispatchService { // 0 代表不限制在线任务,还能省去一次 DB 查询 if (maxInstanceNum > 0) { - // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天) // 由于不统计 WAITING_DISPATCH,所以这个 runningInstanceCount 不包含本任务自身 long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); @@ -99,79 +107,100 @@ public class DispatchService { if (runningInstanceCount >= maxInstanceNum) { String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, result, now); - instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); + instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, result); return; } } + // 获取当前最合适的 worker 列表 + List suitableWorkers = obtainSuitableWorkers(jobInfo); - // 获取当前所有可用的Worker - List allAvailableWorker = WorkerClusterManagerService.getSortedAvailableWorkers(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); - - allAvailableWorker.removeIf(worker -> { - // 空,则全部不过滤 - if (StringUtils.isEmpty(jobInfo.getDesignatedWorkers())) { - return false; - } - // 非空,只有匹配上的 worker 才不被过滤 - Set designatedWorkers = Sets.newHashSet(commaSplitter.splitToList(jobInfo.getDesignatedWorkers())); - return !designatedWorkers.contains(worker.getAddress()); - }); - - if (CollectionUtils.isEmpty(allAvailableWorker)) { + if (CollectionUtils.isEmpty(suitableWorkers)) { String clusterStatusDescription = WorkerClusterManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now); - instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); + instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } + List workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); - // 限定集群大小(0代表不限制) - if (jobInfo.getMaxWorkerCount() > 0) { - if (allAvailableWorker.size() > jobInfo.getMaxWorkerCount()) { - allAvailableWorker = allAvailableWorker.subList(0, jobInfo.getMaxWorkerCount()); - } - } - List workerIpList = allAvailableWorker.stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); + // 构造任务调度请求 + ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList); - // 构造请求 - ServerScheduleJobReq req = new ServerScheduleJobReq(); - BeanUtils.copyProperties(jobInfo, req); - // 传入 JobId - req.setJobId(jobInfo.getId()); - // 传入 InstanceParams - if (StringUtils.isEmpty(instanceParams)) { - req.setInstanceParams(null); - }else { - req.setInstanceParams(instanceParams); - } - req.setInstanceId(instanceId); - req.setAllWorkerAddress(workerIpList); - - // 设置工作流ID - req.setWfInstanceId(wfInstanceId); - - req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name()); - req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name()); - req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name()); - - req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit()); - - req.setThreadConcurrency(jobInfo.getConcurrency()); // 发送请求(不可靠,需要一个后台线程定期轮询状态) - WorkerInfo taskTracker = allAvailableWorker.get(0); + WorkerInfo taskTracker = suitableWorkers.get(0); String taskTrackerAddress = taskTracker.getAddress(); transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req); // 修改状态 - instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams, now); + instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now); // 装载缓存 instanceMetadataService.loadJobInfo(instanceId, jobInfo); } + + /** + * 获取当前最合适的 worker 列表 + */ + private List obtainSuitableWorkers(JobInfoDO jobInfo) { + // 获取当前所有可用的Worker + List allAvailableWorker = WorkerClusterManagerService.getSortedAvailableWorkers(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); + + // 筛选指定的机器 + allAvailableWorker.removeIf(worker -> { + // 空,则全部不过滤 + if (StringUtils.isEmpty(jobInfo.getDesignatedWorkers())) { + return false; + } + // 非空,只有匹配上的 worker 才不被过滤 + Set designatedWorkers = Sets.newHashSet(COMMA_SPLITTER.splitToList(jobInfo.getDesignatedWorkers())); + return !designatedWorkers.contains(worker.getAddress()); + }); + + + // 限定集群大小(0代表不限制) + if (!allAvailableWorker.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && allAvailableWorker.size() > jobInfo.getMaxWorkerCount()) { + allAvailableWorker = allAvailableWorker.subList(0, jobInfo.getMaxWorkerCount()); + } + return allAvailableWorker; + } + + /** + * 构造任务调度请求 + */ + private ServerScheduleJobReq constructServerScheduleJobReq(JobInfoDO jobInfo, InstanceInfoDO instanceInfo, List finalWorkersIpList) { + // 构造请求 + ServerScheduleJobReq req = new ServerScheduleJobReq(); + BeanUtils.copyProperties(jobInfo, req); + // 传入 JobId + req.setJobId(jobInfo.getId()); + // 传入 InstanceParams + if (StringUtils.isEmpty(instanceInfo.getInstanceParams())) { + req.setInstanceParams(null); + } else { + req.setInstanceParams(instanceInfo.getInstanceParams()); + } + // 覆盖静态参数 + if (!StringUtils.isEmpty(instanceInfo.getJobParams())) { + req.setJobParams(instanceInfo.getJobParams()); + } + req.setInstanceId(instanceInfo.getInstanceId()); + req.setAllWorkerAddress(finalWorkersIpList); + + // 设置工作流ID + req.setWfInstanceId(instanceInfo.getWfInstanceId()); + + req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name()); + req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name()); + + req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name()); + req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit()); + req.setThreadConcurrency(jobInfo.getConcurrency()); + return req; + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index eed840c8..153d1791 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.text.ParseException; import java.util.Date; import java.util.List; import java.util.Optional; @@ -52,18 +53,19 @@ public class JobService { /** * 保存/修改任务 + * * @param request 任务请求 * @return 创建的任务ID(jobId) - * @throws Exception 异常 + * @throws ParseException 异常 */ - public Long saveJob(SaveJobInfoRequest request) throws Exception { + public Long saveJob(SaveJobInfoRequest request) throws ParseException { request.valid(); JobInfoDO jobInfoDO; if (request.getId() != null) { jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId())); - }else { + } else { jobInfoDO = new JobInfoDO(); } @@ -107,9 +109,10 @@ public class JobService { /** * 手动立即运行某个任务 - * @param jobId 任务ID + * + * @param jobId 任务ID * @param instanceParams 任务实例参数(仅 OpenAPI 存在) - * @param delay 延迟时间,单位 毫秒 + * @param delay 延迟时间,单位 毫秒 * @return 任务实例ID */ @DesignateServer(appIdParameterName = "appId") @@ -119,14 +122,12 @@ public class JobService { JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay); - Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); + Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); instanceInfoRepository.flush(); if (delay <= 0) { - dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); - }else { - InstanceTimeWheelService.schedule(instanceId, delay, () -> { - dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); - }); + dispatchService.dispatch(jobInfo, instanceId); + } else { + InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfo, instanceId)); } log.info("[Job-{}] run job successfully, params={}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); return instanceId; @@ -135,6 +136,7 @@ public class JobService { /** * 删除某个任务 + * * @param jobId 任务ID */ public void deleteJob(Long jobId) { @@ -150,10 +152,11 @@ public class JobService { /** * 启用某个任务 + * * @param jobId 任务ID - * @throws Exception 异常(CRON表达式错误) + * @throws ParseException 异常(CRON表达式错误) */ - public void enableJob(Long jobId) throws Exception { + public void enableJob(Long jobId) throws ParseException { JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); @@ -166,7 +169,7 @@ public class JobService { * 停止或删除某个JOB * 秒级任务还要额外停止正在运行的任务实例 */ - private void shutdownOrStopJob(Long jobId, SwitchableStatus status) throws IllegalArgumentException { + private void shutdownOrStopJob(Long jobId, SwitchableStatus status) { // 1. 先更新 job_info 表 Optional jobInfoOPT = jobInfoRepository.findById(jobId); @@ -193,12 +196,13 @@ public class JobService { try { // 重复查询了数据库,不过问题不大,这个调用量很小 instanceService.stopInstance(instance.getInstanceId()); - }catch (Exception ignore) { + } catch (Exception ignore) { + // ignore exception } }); } - private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception { + private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws ParseException { // 计算下次调度时间 Date now = new Date(); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); @@ -210,7 +214,7 @@ public class JobService { throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression()); } jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); - }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { + } else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { jobInfoDO.setTimeExpression(null); } // 重写最后修改时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index da6f6403..5da90e35 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -48,9 +49,15 @@ public class InstanceManager { /** * 更新任务状态 + * ******************************************** + * 2021-02-03 modify by Echo009 + * 实例的执行次数统一在这里管理,对于非固定频率的任务 + * 当 db 中实例的状态为等待派发时,runningTimes + 1 + * ******************************************** + * * @param req TaskTracker上报任务实例状态的请求 */ - public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception { + public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException { Long instanceId = req.getInstanceId(); @@ -66,26 +73,15 @@ public class InstanceManager { log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will be dropped.", instanceId, req); return; } - // 丢弃非目标 TaskTracker 的上报数据(脑裂情况) if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) { log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress()); return; } - // 如果任务已经结束,直接丢弃该请求(StopInstance 和 ReportStatus 同时发送的情况) - if (InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) { - log.info("[InstanceManager-{}] instance already finished, this report[{}] will be dropped!", instanceId, req); - return; - } - - InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus()); + InstanceStatus receivedInstanceStatus = InstanceStatus.of(req.getInstanceStatus()); Integer timeExpressionType = jobInfo.getTimeExpressionType(); - instanceInfo.setStatus(newStatus.getV()); - instanceInfo.setLastReportTime(req.getReportTime()); - instanceInfo.setGmtModified(new Date()); - // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可 // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) // 综上,直接把 status 和 runningNum 同步到DB即可 @@ -96,14 +92,22 @@ public class InstanceManager { instanceInfoRepository.saveAndFlush(instanceInfo); return; } + // 更新运行次数 + if (instanceInfo.getStatus() == InstanceStatus.WAITING_WORKER_RECEIVE.getV()) { + // 这里不会存在并发问题 + instanceInfo.setRunningTimes(instanceInfo.getRunningTimes() + 1); + } + instanceInfo.setStatus(receivedInstanceStatus.getV()); + instanceInfo.setLastReportTime(req.getReportTime()); + instanceInfo.setGmtModified(new Date()); boolean finished = false; - if (newStatus == InstanceStatus.SUCCEED) { + if (receivedInstanceStatus == InstanceStatus.SUCCEED) { instanceInfo.setResult(req.getResult()); instanceInfo.setFinishedTime(System.currentTimeMillis()); finished = true; - }else if (newStatus == InstanceStatus.FAILED) { + } else if (receivedInstanceStatus == InstanceStatus.FAILED) { // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =) if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) { @@ -111,14 +115,12 @@ public class InstanceManager { log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes()); // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败) - HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> { - dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); - }, 10, TimeUnit.SECONDS); + HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> dispatchService.redispatch(jobInfo, instanceId), 10, TimeUnit.SECONDS); // 修改状态为 等待派发,正式开始重试 // 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖) instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); - }else { + } else { instanceInfo.setResult(req.getResult()); instanceInfo.setFinishedTime(System.currentTimeMillis()); finished = true; @@ -131,16 +133,17 @@ public class InstanceManager { if (finished) { // 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报 - processFinishedInstance(instanceId, req.getWfInstanceId(), newStatus, req.getResult()); + processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult()); } } /** * 收尾完成的任务实例 - * @param instanceId 任务实例ID + * + * @param instanceId 任务实例ID * @param wfInstanceId 工作流实例ID,非必须 - * @param status 任务状态,有 成功/失败/手动停止 - * @param result 执行结果 + * @param status 任务状态,有 成功/失败/手动停止 + * @param result 执行结果 */ public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) { @@ -160,7 +163,7 @@ public class InstanceManager { JobInfoDO jobInfo; try { jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId); - }catch (Exception e) { + } catch (Exception e) { log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId); return; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index bad1d98c..33d1e4da 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -57,14 +57,20 @@ public class InstanceService { /** * 创建任务实例(注意,该方法并不调用 saveAndFlush,如果有需要立即同步到DB的需求,请在方法结束后手动调用 flush) - * @param jobId 任务ID - * @param appId 所属应用ID - * @param instanceParams 任务实例参数,仅 OpenAPI 创建时存在 - * @param wfInstanceId 工作流任务实例ID,仅工作流下的任务实例存在 + * ******************************************** + * 2021-02-03 modify by Echo009 + * 新增 jobParams ,每次均记录任务静态参数 + * ******************************************** + * + * @param jobId 任务ID + * @param appId 所属应用ID + * @param jobParams 任务静态参数 + * @param instanceParams 任务实例参数,仅 OpenAPI 创建 或者 工作流任务 时存在 + * @param wfInstanceId 工作流任务实例ID,仅工作流下的任务实例存在 * @param expectTriggerTime 预期执行时间 * @return 任务实例ID */ - public Long create(Long jobId, Long appId, String instanceParams, Long wfInstanceId, Long expectTriggerTime) { + public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) { Long instanceId = idGenerateService.allocate(); Date now = new Date(); @@ -73,11 +79,13 @@ public class InstanceService { newInstanceInfo.setJobId(jobId); newInstanceInfo.setAppId(appId); newInstanceInfo.setInstanceId(instanceId); + newInstanceInfo.setJobParams(jobParams); newInstanceInfo.setInstanceParams(instanceParams); newInstanceInfo.setType(wfInstanceId == null ? InstanceType.NORMAL.getV() : InstanceType.WORKFLOW.getV()); newInstanceInfo.setWfInstanceId(wfInstanceId); newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + newInstanceInfo.setRunningTimes(0L); newInstanceInfo.setExpectedTriggerTime(expectTriggerTime); newInstanceInfo.setLastReportTime(-1L); newInstanceInfo.setGmtCreate(now); @@ -89,6 +97,7 @@ public class InstanceService { /** * 停止任务实例 + * * @param instanceId 任务实例ID */ public void stopInstance(Long instanceId) { @@ -125,9 +134,9 @@ public class InstanceService { log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId); } - }catch (IllegalArgumentException ie) { + } catch (IllegalArgumentException ie) { throw ie; - }catch (Exception e) { + } catch (Exception e) { log.error("[Instance-{}] stopInstance failed.", instanceId, e); throw e; } @@ -135,6 +144,7 @@ public class InstanceService { /** * 重试任务(只有结束的任务运行重试) + * * @param instanceId 任务实例ID */ @DesignateServer(appIdParameterName = "appId") @@ -162,12 +172,13 @@ public class InstanceService { // 派发任务 Long jobId = instanceInfo.getJobId(); JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId)); - dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); + dispatchService.redispatch(jobInfo, instanceId); } /** * 取消任务实例的运行 * 接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性! + * * @param instanceId 任务实例 */ public void cancelInstance(Long instanceId) { @@ -194,12 +205,12 @@ public class InstanceService { // 如果写 DB 失败,抛异常,接口返回 false,即取消失败,任务会被 HA 机制重新调度执行,因此此处不需要任何处理 instanceInfoRepository.saveAndFlush(instanceInfo); log.info("[Instance-{}] cancel the instance successfully.", instanceId); - }else { + } else { log.warn("[Instance-{}] cancel the instance failed.", instanceId); throw new PowerJobException("instance already up and running"); } - }catch (Exception e) { + } catch (Exception e) { log.error("[Instance-{}] cancelInstance failed.", instanceId, e); throw e; } @@ -215,6 +226,7 @@ public class InstanceService { /** * 获取任务实例的信息 + * * @param instanceId 任务实例ID * @return 任务实例的信息 */ @@ -224,6 +236,7 @@ public class InstanceService { /** * 获取任务实例的状态 + * * @param instanceId 任务实例ID * @return 任务实例的状态 */ @@ -234,6 +247,7 @@ public class InstanceService { /** * 获取任务实例的详细运行详细 + * * @param instanceId 任务实例ID * @return 详细运行状态 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index 6de92f91..aad8508d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -75,7 +75,7 @@ public class InstanceStatusCheckService { try { checkInstance(allAppIds); checkWorkflowInstance(allAppIds); - }catch (Exception e) { + } catch (Exception e) { log.error("[InstanceStatusChecker] status check failed.", e); } log.info("[InstanceStatusChecker] status check using {}.", stopwatch.stop()); @@ -83,82 +83,97 @@ public class InstanceStatusCheckService { /** * 检查任务实例的状态,发现异常及时重试,包括 - * WAITING_DISPATCH 超时:写入时间伦但为调度前 server down + * WAITING_DISPATCH 超时:写入时间轮但未调度前 server down * WAITING_WORKER_RECEIVE 超时:由于网络错误导致 worker 未接受成功 * RUNNING 超时:TaskTracker down,断开与 server 的心跳连接 + * * @param allAppIds 本系统所承担的所有 appIds */ private void checkInstance(List allAppIds) { Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> { - // 1. 检查等待 WAITING_DISPATCH 状态的任务 - long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; - List waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); - if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { - log.warn("[InstanceStatusChecker] find some instance which is not triggered as expected: {}", waitingDispatchInstances); - waitingDispatchInstances.forEach(instance -> { - - // 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例 - long t = System.currentTimeMillis() - instance.getGmtModified().getTime(); - if (t < DISPATCH_TIMEOUT_MS) { - return; - } - - Optional jobInfoOpt = jobInfoRepository.findById(instance.getJobId()); - if (jobInfoOpt.isPresent()) { - dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId(), 0); - } else { - log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance); - updateFailedInstance(instance, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO); - } - }); - } - + handleWaitingDispatchInstance(partAppIds); // 2. 检查 WAITING_WORKER_RECEIVE 状态的任务 - threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; - List waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); - if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { - log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances); - waitingWorkerReceiveInstances.forEach(instance -> { - // 重新派发 - JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); - dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0); - }); - } - - // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败) - threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS; - List failedInstances = instanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold)); - if (!CollectionUtils.isEmpty(failedInstances)) { - log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances); - failedInstances.forEach(instance -> { - - JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); - TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); - SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus()); - - // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 - if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { - updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); - return; - } - - // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 - if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) { - dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); - }else { - updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); - } - - }); - } + handleWaitingWorkerReceiveInstance(partAppIds); + // 3. 检查 RUNNING 状态的任务(一定时间内没收到 TaskTracker 的状态报告,视为失败) + handleRunningInstance(partAppIds); }); } + + private void handleWaitingDispatchInstance(List partAppIds) { + // 1. 检查等待 WAITING_DISPATCH 状态的任务 + long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; + List waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); + if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { + log.warn("[InstanceStatusChecker] find some instance which is not triggered as expected: {}", waitingDispatchInstances); + waitingDispatchInstances.forEach(instance -> { + + // 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例 + long t = System.currentTimeMillis() - instance.getGmtModified().getTime(); + if (t < DISPATCH_TIMEOUT_MS) { + return; + } + + Optional jobInfoOpt = jobInfoRepository.findById(instance.getJobId()); + if (jobInfoOpt.isPresent()) { + dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId()); + } else { + log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance); + updateFailedInstance(instance, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO); + } + }); + } + } + + private void handleWaitingWorkerReceiveInstance(List partAppIds) { + // 2. 检查 WAITING_WORKER_RECEIVE 状态的任务 + long threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; + List waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); + if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { + log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances); + waitingWorkerReceiveInstances.forEach(instance -> { + // 重新派发 + JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); + dispatchService.redispatch(jobInfoDO, instance.getInstanceId()); + }); + } + } + + private void handleRunningInstance(List partAppIds) { + // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败) + long threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS; + List failedInstances = instanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold)); + if (!CollectionUtils.isEmpty(failedInstances)) { + log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances); + failedInstances.forEach(instance -> { + + JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); + TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); + SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus()); + + // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 + if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { + updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); + return; + } + + // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 + if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) { + dispatchService.redispatch(jobInfoDO, instance.getInstanceId()); + } else { + updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); + } + + }); + } + } + /** * 定期检查工作流实例状态 * 此处仅检查并重试长时间处于 WAITING 状态的工作流实例,工作流的其他可靠性由 Instance 支撑,即子任务失败会反馈会 WorkflowInstance + * * @param allAppIds 本系统所承担的所有 appIds */ private void checkWorkflowInstance(List allAppIds) { @@ -175,7 +190,7 @@ public class InstanceStatusCheckService { waitingWfInstanceList.forEach(wfInstance -> { Optional workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId()); workflowOpt.ifPresent(workflowInfo -> { - workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams()); + workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId()); log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId()); }); }); @@ -184,7 +199,7 @@ public class InstanceStatusCheckService { } /** - * 处理上报超时而失败的任务实例 + * 处理失败的任务实例 */ private void updateFailedInstance(InstanceInfoDO instance, String result) { @@ -196,6 +211,6 @@ public class InstanceStatusCheckService { instance.setResult(result); instanceInfoRepository.saveAndFlush(instance); - instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT); + instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, result); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index b0e25727..69f66ff1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -30,6 +30,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.text.ParseException; import java.util.*; import java.util.stream.Collectors; @@ -45,7 +46,9 @@ import java.util.stream.Collectors; @Service public class OmsScheduleService { - // 每次并发调度的应用数量 + /** + * 每次并发调度的应用数量 + */ private static final int MAX_APP_NUM = 10; @Resource @@ -89,7 +92,7 @@ public class OmsScheduleService { // 调度 CRON 表达式 JOB try { scheduleCronJob(allAppIds); - }catch (Exception e) { + } catch (Exception e) { log.error("[CronScheduler] schedule cron job failed.", e); } String cronTime = stopwatch.toString(); @@ -98,7 +101,7 @@ public class OmsScheduleService { // 调度 workflow 任务 try { scheduleWorkflow(allAppIds); - }catch (Exception e) { + } catch (Exception e) { log.error("[WorkflowScheduler] schedule workflow job failed.", e); } String wfTime = stopwatch.toString(); @@ -107,7 +110,7 @@ public class OmsScheduleService { // 调度 秒级任务 try { scheduleFrequentJob(allAppIds); - }catch (Exception e) { + } catch (Exception e) { log.error("[FrequentScheduler] schedule frequent job failed.", e); } @@ -142,13 +145,13 @@ public class OmsScheduleService { log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); jobInfos.forEach(jobInfo -> { - Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime()); + Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()); jobId2InstanceId.put(jobInfo.getId(), instanceId); }); instanceInfoRepository.flush(); // 2. 推入时间轮中等待调度执行 - jobInfos.forEach(jobInfoDO -> { + jobInfos.forEach(jobInfoDO -> { Long instanceId = jobId2InstanceId.get(jobInfoDO.getId()); @@ -156,13 +159,11 @@ public class OmsScheduleService { long delay = 0; if (targetTriggerTime < nowTime) { log.warn("[Job-{}] schedule delay, expect: {}, current: {}", jobInfoDO.getId(), targetTriggerTime, System.currentTimeMillis()); - }else { + } else { delay = targetTriggerTime - nowTime; } - InstanceTimeWheelService.schedule(instanceId, delay, () -> { - dispatchService.dispatch(jobInfoDO, instanceId, 0, null, null); - }); + InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId)); }); // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) @@ -176,7 +177,7 @@ public class OmsScheduleService { jobInfoRepository.flush(); - }catch (Exception e) { + } catch (Exception e) { log.error("[CronScheduler] schedule cron job failed.", e); } }); @@ -204,12 +205,12 @@ public class OmsScheduleService { log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis()); delay = 0; } - InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null)); + InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId)); // 3. 重新计算下一次调度时间并更新 try { refreshWorkflow(wfInfo); - }catch (Exception e) { + } catch (Exception e) { log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e); } }); @@ -246,13 +247,13 @@ public class OmsScheduleService { Optional jobInfoOpt = jobInfoRepository.findById(jobId); jobInfoOpt.ifPresent(jobInfoDO -> jobService.runJob(jobInfoDO.getAppId(), jobId, null, 0L)); }); - }catch (Exception e) { + } catch (Exception e) { log.error("[FrequentScheduler] schedule frequent job failed.", e); } }); } - private void refreshJob(JobInfoDO jobInfo) throws Exception { + private void refreshJob(JobInfoDO jobInfo) throws ParseException { Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); JobInfoDO updatedJobInfo = new JobInfoDO(); @@ -261,7 +262,7 @@ public class OmsScheduleService { if (nextTriggerTime == null) { log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId()); updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV()); - }else { + } else { updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); } updatedJobInfo.setGmtModified(new Date()); @@ -269,7 +270,7 @@ public class OmsScheduleService { jobInfoRepository.save(updatedJobInfo); } - private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception { + private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException { Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); WorkflowInfoDO updateEntity = new WorkflowInfoDO(); @@ -278,7 +279,7 @@ public class OmsScheduleService { if (nextTriggerTime == null) { log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId()); wfInfo.setStatus(SwitchableStatus.DISABLE.getV()); - }else { + } else { updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); } @@ -288,12 +289,13 @@ public class OmsScheduleService { /** * 计算下次触发时间 + * * @param preTriggerTime 前一次触发时间 * @param cronExpression CRON 表达式 * @return 下一次调度时间 - * @throws Exception 异常 + * @throws ParseException 异常 */ - private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws Exception { + private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws ParseException { CronExpression ce = new CronExpression(cronExpression); // 取最大值,防止长时间未调度任务被连续调度(原来DISABLE的任务突然被打开,不取最大值会补上过去所有的调度) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index adf8cc4c..73c94e4e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -1,10 +1,7 @@ package com.github.kfcfans.powerjob.server.service.workflow; -import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.SystemInstanceResult; -import com.github.kfcfans.powerjob.common.TimeExpressionType; -import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; +import com.alibaba.fastjson.JSON; +import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; @@ -23,19 +20,13 @@ import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; +import com.google.common.collect.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; /** * 管理运行中的工作流实例 @@ -66,8 +57,13 @@ public class WorkflowInstanceManager { /** * 创建工作流任务实例 - * @param wfInfo 工作流任务元数据(描述信息) - * @param initParams 启动参数 + * ******************************************** + * 2021-02-03 modify by Echo009 + * 通过 initParams 初始化工作流上下文(wfContext) + * ******************************************** + * + * @param wfInfo 工作流任务元数据(描述信息) + * @param initParams 启动参数 * @param expectTriggerTime 预计执行时间 * @return wfInstanceId */ @@ -86,13 +82,17 @@ public class WorkflowInstanceManager { newWfInstance.setExpectedTriggerTime(expectTriggerTime); newWfInstance.setActualTriggerTime(System.currentTimeMillis()); newWfInstance.setWfInitParams(initParams); + // 初始化上下文 + Map wfContextMap = Maps.newHashMap(); + wfContextMap.put(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, initParams); + newWfInstance.setWfContext(JsonUtils.toJSONString(wfContextMap)); newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); // 校验合法性(工作是否存在且启用) - List allJobIds = Lists.newLinkedList(); - PEWorkflowDAG dag = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); + Set allJobIds = Sets.newHashSet(); + PEWorkflowDAG dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); dag.getNodes().forEach(node -> allJobIds.add(node.getJobId())); int needNum = allJobIds.size(); long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds); @@ -101,19 +101,25 @@ public class WorkflowInstanceManager { if (dbNum < allJobIds.size()) { log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance); - }else { - workflowInstanceInfoRepository.save(newWfInstance); + } else { + workflowInstanceInfoRepository.saveAndFlush(newWfInstance); } return wfInstanceId; } /** * 开始任务 - * @param wfInfo 工作流任务信息 + * ******************************************** + * 2021-02-03 modify by Echo009 + * 1、工作流支持配置重复的任务节点 + * 2、移除参数 initParams,改为统一从工作流实例中获取 + * 传递工作流实例的 wfContext 作为 初始启动参数 + * ******************************************** + * + * @param wfInfo 工作流任务信息 * @param wfInstanceId 工作流任务实例ID - * @param initParams 启动参数 */ - public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) { + public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); if (!wfInstanceInfoOpt.isPresent()) { @@ -124,7 +130,7 @@ public class WorkflowInstanceManager { // 不是等待中,不再继续执行(可能上一流程已经失败) if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) { - log.info("[Workflow-{}|{}] workflowInstance({}) need't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo); + log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo); return; } @@ -137,19 +143,23 @@ public class WorkflowInstanceManager { try { - // 构建根任务启动参数(为了精简 worker 端实现,启动参数仍以 instanceParams 字段承接) - Map preJobId2Result = Maps.newHashMap(); - // 模拟 preJobId -> preJobResult 的格式,-1 代表前置任务不存在 - preJobId2Result.put("-1", initParams); - String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result); - - PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); + PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); List roots = WorkflowDAGUtils.listRoots(peWorkflowDAG); peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV())); + Map nodeId2JobInfoMap = Maps.newHashMap(); // 创建所有的根任务 roots.forEach(root -> { - Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis()); + // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示 + // 如果 job 信息缺失,在 dispatch 的时候会失败,继而使整个工作流失败 + JobInfoDO jobInfo = jobInfoRepository.findById(root.getJobId()).orElseGet(JobInfoDO::new); + if (jobInfo.getId() == null) { + // 在创建工作流实例到当前的这段时间内 job 信息被物理删除了 + log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId()); + } + nodeId2JobInfoMap.put(root.getNodeId(), jobInfo); + // instanceParam 传递的是工作流实例的 wfContext + Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), jobInfo.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis()); root.setInstanceId(instanceId); root.setStatus(InstanceStatus.RUNNING.getV()); @@ -158,13 +168,13 @@ public class WorkflowInstanceManager { // 持久化 wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); - wfInstanceInfo.setDag(JSONObject.toJSONString(peWorkflowDAG)); + wfInstanceInfo.setDag(JSON.toJSONString(peWorkflowDAG)); workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 - roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams)); - }catch (Exception e) { + roots.forEach(root -> runInstance(nodeId2JobInfoMap.get(root.getNodeId()), root.getInstanceId())); + } catch (Exception e) { log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo); @@ -173,11 +183,19 @@ public class WorkflowInstanceManager { /** * 下一步(当工作流的某个任务完成时调用该方法) + * ******************************************** + * 2021-02-03 modify by Echo009 + * 1、工作流支持配置重复的任务节点 + * 2、不再获取上游任务的结果作为实例参数而是传递工作流 + * 实例的 wfContext 作为 实例参数 + * ******************************************** + * * @param wfInstanceId 工作流任务实例ID - * @param instanceId 具体完成任务的某个任务实例ID - * @param status 完成任务的任务实例状态(SUCCEED/FAILED/STOPPED) - * @param result 完成任务的任务实例结果 + * @param instanceId 具体完成任务的某个任务实例ID + * @param status 完成任务的任务实例状态(SUCCEED/FAILED/STOPPED) + * @param result 完成任务的任务实例结果 */ + @SuppressWarnings({"squid:S3776","squid:S2142","squid:S1141"}) public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) { int lockId = wfInstanceId.hashCode(); @@ -192,18 +210,16 @@ public class WorkflowInstanceManager { WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); Long wfId = wfInstance.getWorkflowId(); - // 特殊处理手动终止的情况 - if (status == InstanceStatus.STOPPED) { - // 工作流已经不在运行状态了(由用户手动停止工作流实例导致),不需要任何操作 - if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { - return; - } + // 特殊处理手动终止 且 工作流实例已经不在运行状态的情况 + if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + // 由用户手动停止工作流实例导致,不需要任何操作 + return; } try { - PEWorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); - // 保存 jobId -> Node 的映射关系(一个job只能出现一次的原因) - Map jobId2Node = Maps.newHashMap(); + PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + // 保存 nodeId -> Node 的映射关系 + Map nodeId2Node = Maps.newHashMap(); // 更新完成节点状态 boolean allFinished = true; @@ -218,11 +234,11 @@ public class WorkflowInstanceManager { if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { allFinished = false; } - jobId2Node.put(node.getJobId(), node); + nodeId2Node.put(node.getNodeId(), node); } wfInstance.setGmtModified(new Date()); - wfInstance.setDag(JSONObject.toJSONString(dag)); + wfInstance.setDag(JSON.toJSONString(dag)); // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图 if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { workflowInstanceInfoRepository.saveAndFlush(wfInstance); @@ -265,67 +281,64 @@ public class WorkflowInstanceManager { dag.getEdges().forEach(edge -> relyMap.put(edge.getTo(), edge.getFrom())); // 重新计算需要派发的任务 - Map jobId2InstanceId = Maps.newHashMap(); - Map jobId2InstanceParams = Maps.newHashMap(); - - relyMap.keySet().forEach(jobId -> { - + List readyNodes = Lists.newArrayList(); + Map nodeId2JobInfoMap = Maps.newHashMap(); + relyMap.keySet().forEach(nodeId -> { + PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId); // 跳过已完成节点(理论上此处不可能出现 FAILED 的情况)和已派发节点(存在 InstanceId) - if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV() || jobId2Node.get(jobId).getInstanceId() != null) { + if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV() || currentNode.getInstanceId() != null) { return; } // 判断某个任务所有依赖的完成情况,只要有一个未成功,即无法执行 - for (Long reliedJobId : relyMap.get(jobId)) { - if (jobId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) { + for (Long reliedJobId : relyMap.get(nodeId)) { + if (nodeId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) { return; } } - - // 所有依赖已经执行完毕,可以执行该任务 (为什么是 Key 是 String?在 JSON 标准中,key必须由双引号引起来,Long会导致结果无法被反序列化) - Map preJobId2Result = Maps.newHashMap(); - // 构建下一个任务的入参 (前置任务 jobId -> result) - relyMap.get(jobId).forEach(jid -> preJobId2Result.put(String.valueOf(jid), jobId2Node.get(jid).getResult())); - - Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis()); - jobId2Node.get(jobId).setInstanceId(newInstanceId); - jobId2Node.get(jobId).setStatus(InstanceStatus.RUNNING.getV()); - - jobId2InstanceId.put(jobId, newInstanceId); - jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result)); - - log.debug("[Workflow-{}|{}] workflowInstance start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId); + // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 + JobInfoDO jobInfo = jobInfoRepository.findById(currentNode.getJobId()).orElseGet(JobInfoDO::new); + if (jobInfo.getId() == null) { + // 在创建工作流实例到当前的这段时间内 job 信息被物理删除了 + log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId()); + } + nodeId2JobInfoMap.put(nodeId, jobInfo); + // instanceParam 传递的是工作流实例的 wfContext + Long newInstanceId = instanceService.create(jobInfo.getId(), wfInstance.getAppId(), jobInfo.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); + currentNode.setInstanceId(newInstanceId); + currentNode.setStatus(InstanceStatus.RUNNING.getV()); + readyNodes.add(currentNode); + log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId(), newInstanceId); }); - wfInstance.setDag(JSONObject.toJSONString(dag)); + wfInstance.setDag(JSON.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); - // 持久化结束后,开始调度执行所有的任务 - jobId2InstanceId.forEach((jobId, newInstanceId) -> runInstance(jobId, newInstanceId, wfInstanceId, jobId2InstanceParams.get(jobId))); + readyNodes.forEach(node -> runInstance(nodeId2JobInfoMap.get(node.getNodeId()), node.getInstanceId())); - }catch (Exception e) { + } catch (Exception e) { onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); } } catch (InterruptedException ignore) { + // ignore } finally { segmentLock.unlock(lockId); } } + /** * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 - * @param jobId 任务ID + * + * @param jobInfo 任务信息 * @param instanceId 任务实例ID - * @param wfInstanceId 工作流任务实例ID - * @param instanceParams 任务实例参数,值为上游任务的执行结果: preJobId to preJobInstanceResult */ - private void runInstance(Long jobId, Long instanceId, Long wfInstanceId, String instanceParams) { - JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); + private void runInstance(JobInfoDO jobInfo, Long instanceId) { // 洗去时间表达式类型 jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); - dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, wfInstanceId); + dispatchService.dispatch(jobInfo, instanceId); } private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) { @@ -349,7 +362,8 @@ public class WorkflowInstanceManager { List userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds()); AlarmCenter.alarmFailed(content, userList); }); - }catch (Exception ignore) { + } catch (Exception ignore) { + // ignore } } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 9195b87e..38624f3b 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.service.workflow; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSON; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; @@ -18,6 +18,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.text.ParseException; import java.util.Date; /** @@ -39,9 +40,9 @@ public class WorkflowService { * 保存/修改DAG工作流 * @param req 请求 * @return 工作流ID - * @throws Exception 异常 + * @throws ParseException 异常 */ - public Long saveWorkflow(SaveWorkflowRequest req) throws Exception { + public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException { req.valid(); @@ -60,7 +61,7 @@ public class WorkflowService { BeanUtils.copyProperties(req, wf); wf.setGmtModified(new Date()); - wf.setPeDAG(JSONObject.toJSONString(req.getPEWorkflowDAG())); + wf.setPeDAG(JSON.toJSONString(req.getPEWorkflowDAG())); wf.setStatus(req.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); wf.setTimeExpressionType(req.getTimeExpressionType().getV()); @@ -147,9 +148,9 @@ public class WorkflowService { log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay); Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay); if (delay <= 0) { - workflowInstanceManager.start(wfInfo, wfInstanceId, initParams); + workflowInstanceManager.start(wfInfo, wfInstanceId); }else { - InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams)); + InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId)); } return wfInstanceId; } diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java index 341fe412..ba5f8530 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java @@ -86,7 +86,7 @@ public class RepositoryTest { @Test @Transactional public void testExecuteLogUpdate() { - instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL", "", new Date()); + instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL", new Date()); instanceInfoRepository.update4FrequentJob(1586310419650L, 2, 200, new Date()); }