From a86834d9a9e6b23fa7acba7e8a7a49f5990795fe Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 7 Apr 2020 17:49:12 +0800 Subject: [PATCH] develop the instance status checker and InstanceManager --- .../github/kfcfans/common/InstanceStatus.java | 9 + .../TaskTrackerReportInstanceStatusReq.java | 2 + .../oms/server/common/utils/SpringUtils.java | 28 +++ .../oms/server/core/InstanceManager.java | 159 ++++++++++++++++++ .../oms/server/core/InstanceStatusHolder.java | 25 +++ .../oms/server/core/akka/ServerActor.java | 10 +- .../server/persistence/model/AppInfoDO.java | 2 +- .../persistence/model/ExecuteLogDO.java | 7 +- .../repository/ExecuteLogRepository.java | 27 ++- .../oms/server/service/DispatchService.java | 11 +- .../timing/InstanceStatusCheckService.java | 109 ++++++++++++ .../schedule/HashedWheelTimerHolder.java | 2 +- .../schedule/JobScheduleService.java | 12 +- .../oms/server/test/RepositoryTest.java | 13 ++ .../worker/core/tracker/task/TaskTracker.java | 1 + 15 files changed, 400 insertions(+), 17 deletions(-) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/SpringUtils.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/{ => timing}/schedule/HashedWheelTimerHolder.java (85%) rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/{ => timing}/schedule/JobScheduleService.java (94%) diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java index 910a02e5..1bb1f815 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java @@ -22,4 +22,13 @@ public enum InstanceStatus { private int v; private String des; + + public static InstanceStatus of(int v) { + for (InstanceStatus is : values()) { + if (v == is.v) { + return is; + } + } + throw new IllegalArgumentException("InstanceStatus has no item for value " + v); + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java index 74d62884..b6635c5b 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java @@ -24,4 +24,6 @@ public class TaskTrackerReportInstanceStatusReq implements Serializable { private long totalTaskNum; private long succeedTaskNum; private long failedTaskNum; + + private long reportTime; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/SpringUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/SpringUtils.java new file mode 100644 index 00000000..2d9a9800 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/SpringUtils.java @@ -0,0 +1,28 @@ +package com.github.kfcfans.oms.server.common.utils; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * Spring ApplicationContext 工具类 + * + * @author tjq + * @since 2020/4/7 + */ +@Component +public class SpringUtils implements ApplicationContextAware { + + private static ApplicationContext context; + + public static T getBean(Class clz) { + return context.getBean(clz); + } + + @Override + public void setApplicationContext(ApplicationContext ctx) throws BeansException { + context = ctx; + } +} + diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java new file mode 100644 index 00000000..fb3c1dfa --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java @@ -0,0 +1,159 @@ +package com.github.kfcfans.oms.server.core; + +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; +import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; +import com.github.kfcfans.oms.server.common.utils.SpringUtils; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.service.DispatchService; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; + +import java.util.Date; +import java.util.Map; + +/** + * 管理被调度的服务 + * + * @author tjq + * @since 2020/4/7 + */ +@Slf4j +public class InstanceManager { + + // 存储 instanceId 对应的 Job 信息,便于重试 + private static final Map instanceId2JobInfo = Maps.newConcurrentMap(); + // 存储 instance 的状态,只有状态变更才会更新数据库,减轻DB压力 + private static final Map instanceId2StatusHolder = Maps.newConcurrentMap(); + + // Spring Bean + private static DispatchService dispatchService; + private static ExecuteLogRepository executeLogRepository; + private static JobInfoRepository jobInfoRepository; + + /** + * 注册到任务实例管理器 + * @param instanceId 即将运行的任务实例ID + * @param jobInfoDO 即将运行的任务实例对应的任务元数据 + */ + public static void register(Long instanceId, JobInfoDO jobInfoDO) { + + InstanceStatusHolder statusHolder = new InstanceStatusHolder(); + statusHolder.setInstanceId(instanceId); + statusHolder.setInstanceStatus(InstanceStatus.WAITING_DISPATCH.getV()); + + instanceId2JobInfo.put(instanceId, jobInfoDO); + instanceId2StatusHolder.put(instanceId, statusHolder); + } + + /** + * 更新任务状态 + * @param req TaskTracker上报任务实例状态的请求 + */ + public static void updateStatus(TaskTrackerReportInstanceStatusReq req) { + + Long jobId = req.getJobId(); + Long instanceId = req.getInstanceId(); + + // 不存在,可能该任务实例刚经历Server变更,需要重新构建基础信息 + if (!instanceId2JobInfo.containsKey(instanceId)) { + log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId); + + JobInfoDO JobInfoDo = getJobInfoRepository().getOne(jobId); + instanceId2JobInfo.put(instanceId, JobInfoDo); + } + + // 更新本地保存的任务实例状态(用于未完成任务前的详细信息查询和缓存加速) + InstanceStatusHolder statusHolder = instanceId2StatusHolder.computeIfAbsent(instanceId, ignore -> new InstanceStatusHolder()); + if (req.getReportTime() > statusHolder.getLastReportTime()) { + BeanUtils.copyProperties(req, statusHolder); + statusHolder.setLastReportTime(req.getReportTime()); + }else { + log.warn("[InstanceManager] receive the expired status report request: {}.", req); + return; + } + + InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus()); + Integer timeExpressionType = instanceId2JobInfo.get(instanceId).getTimeExpressionType(); + + // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可 + // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) + // 综上,直接把 status 和 runningNum 同步到DB即可 + if (timeExpressionType != TimeExpressionType.CRON.getV()) { + getExecuteLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum()); + return; + } + + ExecuteLogDO updateEntity = getExecuteLogRepository().getOne(instanceId); + updateEntity.setStatus(newStatus.getV()); + updateEntity.setGmtModified(new Date()); + + boolean finished = false; + if (newStatus == InstanceStatus.SUCCEED) { + updateEntity.setResult(req.getResult()); + updateEntity.setFinishedTime(System.currentTimeMillis()); + + finished = true; + log.info("[InstanceManager] instance(instanceId={}) execute succeed.", instanceId); + }else if (newStatus == InstanceStatus.FAILED) { + + // 当前重试次数 < 最大重试次数,进行重试 + if (updateEntity.getRunningTimes() < instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { + + log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes()); + getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); + }else { + updateEntity.setResult(req.getResult()); + updateEntity.setFinishedTime(System.currentTimeMillis()); + finished = true; + log.info("[InstanceManager] instance(instanceId={}) execute failed and have no chance to retry.", instanceId); + } + } + + // 同步状态变更信息到数据库 + getExecuteLogRepository().saveAndFlush(updateEntity); + + // 清除已完成的实例信息 + if (finished) { + instanceId2JobInfo.remove(instanceId); + instanceId2StatusHolder.remove(instanceId); + } + } + + private static ExecuteLogRepository getExecuteLogRepository() { + while (executeLogRepository == null) { + try { + Thread.sleep(100); + }catch (Exception ignore) { + } + executeLogRepository = SpringUtils.getBean(ExecuteLogRepository.class); + } + return executeLogRepository; + } + + private static JobInfoRepository getJobInfoRepository() { + while (jobInfoRepository == null) { + try { + Thread.sleep(100); + }catch (Exception ignore) { + } + jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); + } + return jobInfoRepository; + } + + private static DispatchService getDispatchService() { + while (dispatchService == null) { + try { + Thread.sleep(100); + }catch (Exception ignore) { + } + dispatchService = SpringUtils.getBean(DispatchService.class); + } + return dispatchService; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java new file mode 100644 index 00000000..260a8fd7 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.oms.server.core; + +import lombok.Data; + +/** + * 保存任务实例的状态信息 + * + * @author tjq + * @since 2020/4/7 + */ +@Data +public class InstanceStatusHolder { + + private long instanceId; + private int instanceStatus; + private String result; + + /* ********* 统计信息 ********* */ + private long totalTaskNum; + private long succeedTaskNum; + private long failedTaskNum; + + // 上次上报时间 + private long lastReportTime; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java index 629c2037..db366f5b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java @@ -4,6 +4,7 @@ import akka.actor.AbstractActor; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; +import com.github.kfcfans.oms.server.core.InstanceManager; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import lombok.extern.slf4j.Slf4j; @@ -20,6 +21,7 @@ public class ServerActor extends AbstractActor { public Receive createReceive() { return receiveBuilder() .match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat) + .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) .match(Ping.class, this::onReceivePing) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .build(); @@ -48,7 +50,11 @@ public class ServerActor extends AbstractActor { * 处理 instance 状态 * @param req 任务实例的状态上报请求 */ - private void onReceive(TaskTrackerReportInstanceStatusReq req) { - + private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { + try { + InstanceManager.updateStatus(req); + }catch (Exception e) { + log.error("[ServerActor] update instance status failed for request: {}.", req); + } } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java index 6688ef72..b757db4b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java @@ -23,7 +23,7 @@ public class AppInfoDO { private String appName; private String description; - // 当前负责该 appName 旗下任务调度的server地址,IP:Port + // 当前负责该 appName 旗下任务调度的server地址,IP:Port(注意,该地址为ActorSystem地址,而不是HTTP地址,两者端口不同) private String currentServer; private Date gmtCreate; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java index d8359052..535313c7 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java @@ -30,12 +30,15 @@ public class ExecuteLogDO { private int status; // 执行结果 private String result; - // 耗时 - private Long usedTime; // 预计触发时间 private Long expectedTriggerTime; // 实际触发时间 private Long actualTriggerTime; + // 结束时间 + private Long finishedTime; + + // 总共执行的次数(CRON任务 -> 代表重试次数,FREQUENT -> 代表总执行次数) + private Long runningTimes; private Date gmtCreate; private Date gmtModified; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java index 84acd089..d6f22893 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java @@ -4,6 +4,7 @@ import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; +import java.util.Date; import java.util.List; /** @@ -14,10 +15,30 @@ import java.util.List; */ public interface ExecuteLogRepository extends JpaRepository { + /** + * 统计当前JOB有多少实例正在运行 + */ long countByJobIdAndStatusIn(long jobId, List status); - @Query(value = "update execute_log set status = ?2, result = ?3 where instance_id = ?1", nativeQuery = true) - int updateStatusAndLog(long instanceId, int status, String result); - List findByJobIdIn(List jobIds); + + + /** + * 更新任务执行记录内容(DispatchService专用) + * @param instanceId 任务实例ID,分布式唯一 + * @param status 任务实例运行状态 + * @param runningTimes 运行次数 + * @param result 结果 + * @return 更新数量 + */ + @Query(value = "update execute_log set status = ?2, running_times = ?3, actual_trigger_time = now(), result = ?4, gmt_modified = now() where instance_id = ?1", nativeQuery = true) + int update4Trigger(long instanceId, int status, long runningTimes, String result); + + @Query(value = "update execute_log set status = ?2, running_times = ?3, gmt_modified = now() where instance_id = ?1", nativeQuery = true) + int update4FrequentJob(long instanceId, int status, long runningTimes); + + // 状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段 + List findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List jobIds, int status, long time); + List findByJobIdInAndStatusAndActualTriggerTimeLessThan(List jobIds, int status, long time); + List findByJobIdInAndStatusAndGmtModifiedBefore(List jobIds, int status, Date time); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index c20bbb3b..0034bf39 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -20,7 +20,7 @@ import static com.github.kfcfans.common.InstanceStatus.*; /** - * 派送服务 + * 派送服务(将任务从Server派发到Worker) * * @author tjq * @since 2020/4/5 @@ -39,7 +39,7 @@ public class DispatchService { private static final String NO_WORKER_REASON = "no worker available"; private static final String EMPTY_RESULT = ""; - public void dispatch(JobInfoDO jobInfo, long instanceId) { + public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { log.debug("[DispatchService] start to dispatch job -> {}.", jobInfo); @@ -50,7 +50,7 @@ public class DispatchService { if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { String result = String.format(FAILED_REASON, runningInstanceCount); log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount); - executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), result); + executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, result); return; } @@ -61,7 +61,7 @@ public class DispatchService { if (StringUtils.isEmpty(taskTrackerAddress)) { log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo); - executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), NO_WORKER_REASON); + executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, NO_WORKER_REASON); return; } @@ -89,8 +89,9 @@ public class DispatchService { // 发送请求(不可靠,需要一个后台线程定期轮询状态) ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress); taskTrackerActor.tell(req, null); + log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); // 修改状态 - executeLogRepository.updateStatusAndLog(instanceId, WAITING_WORKER_RECEIVE.getV(), EMPTY_RESULT); + executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, EMPTY_RESULT); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java new file mode 100644 index 00000000..936d85f6 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -0,0 +1,109 @@ +package com.github.kfcfans.oms.server.service.timing; + +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.service.DispatchService; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 定时状态检查 + * + * @author tjq + * @since 2020/4/7 + */ +@Slf4j +@Service +public class InstanceStatusCheckService { + + private static final int MAX_BATCH_NUM = 10; + private static final long DISPATCH_TIMEOUT_MS = 10000; + private static final long RECEIVE_TIMEOUT_MS = 60000; + private static final long RUNNING_TIMEOUT_MS = 60000; + + @Resource + private DispatchService dispatchService; + @Resource + private AppInfoRepository appInfoRepository; + @Resource + private ExecuteLogRepository executeLogRepository; + @Resource + private JobInfoRepository jobInfoRepository; + + @Scheduled(fixedRate = 10000) + public void timingStatusCheck() { + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + innerCheck(); + }catch (Exception e) { + log.error("[InstanceStatusCheckService] status check failed.", e); + } + log.info("[InstanceStatusCheckService] status check using {}.", stopwatch.stop()); + } + + private void innerCheck() { + + // 查询DB获取该Server需要负责的AppGroup + List appInfoList = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress()); + if (CollectionUtils.isEmpty(appInfoList)) { + log.info("[InstanceStatusCheckService] current server has no app's job to check"); + return; + } + List allAppIds = appInfoList.stream().map(AppInfoDO::getId).collect(Collectors.toList()); + + Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> { + + // 1. 检查等待 WAITING_DISPATCH 状态的任务 + long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; + List waitingDispatchInstances = executeLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); + if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { + log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances); + waitingDispatchInstances.forEach(instance -> { + // 重新派发 + JobInfoDO jobInfoDO = jobInfoRepository.getOne(instance.getJobId()); + dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); + }); + } + + // 2. 检查 WAITING_WORKER_RECEIVE 状态的任务 + threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; + List waitingWorkerReceiveInstances = executeLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); + if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { + log.warn("[InstanceStatusCheckService] instances({}) did n’t receive any reply from worker.", waitingWorkerReceiveInstances); + waitingWorkerReceiveInstances.forEach(instance -> { + // 重新派发 + JobInfoDO jobInfoDO = jobInfoRepository.getOne(instance.getJobId()); + dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); + }); + } + + // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败) + threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS; + List failedInstances = executeLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold)); + if (!CollectionUtils.isEmpty(failedInstances)) { + log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances); + failedInstances.forEach(instance -> { + // 重新派发 + JobInfoDO jobInfoDO = jobInfoRepository.getOne(instance.getJobId()); + dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); + }); + } + }); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/HashedWheelTimerHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/HashedWheelTimerHolder.java similarity index 85% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/HashedWheelTimerHolder.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/HashedWheelTimerHolder.java index 6305706d..c0cfba2f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/HashedWheelTimerHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/HashedWheelTimerHolder.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.server.service.schedule; +package com.github.kfcfans.oms.server.service.timing.schedule; import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java similarity index 94% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 480d878d..7d992973 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -1,9 +1,10 @@ -package com.github.kfcfans.oms.server.service.schedule; +package com.github.kfcfans.oms.server.service.timing.schedule; import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; import com.github.kfcfans.oms.server.common.utils.CronExpression; +import com.github.kfcfans.oms.server.core.InstanceManager; import com.github.kfcfans.oms.server.core.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; @@ -68,6 +69,7 @@ public class JobScheduleService { } List allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList()); + // 调度 CRON 表达式 JOB try { scheduleCornJob(allAppIds); }catch (Exception e) { @@ -76,6 +78,7 @@ public class JobScheduleService { log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch); stopwatch.reset().start(); + // 调度 FIX_RATE 和 FIX_DELAY JOB try { scheduleFrequentJob(allAppIds); }catch (Exception e) { @@ -132,6 +135,10 @@ public class JobScheduleService { // 2. 推入时间轮中等待调度执行 jobInfos.forEach(jobInfoDO -> { + Long instanceId = jobId2InstanceId.get(jobInfoDO.getId()); + // 注册到任务实例管理中心 + InstanceManager.register(instanceId, jobInfoDO); + long targetTriggerTime = jobInfoDO.getNextTriggerTime(); long delay = 0; if (targetTriggerTime < nowTime) { @@ -141,9 +148,8 @@ public class JobScheduleService { } HashedWheelTimerHolder.TIMER.schedule(() -> { - dispatchService.dispatch(jobInfoDO, jobId2InstanceId.get(jobInfoDO.getId())); + dispatchService.dispatch(jobInfoDO, instanceId, 0); }, delay, TimeUnit.MILLISECONDS); - }); // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java index 791ecc83..aae1b9ef 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java @@ -3,8 +3,10 @@ package com.github.kfcfans.oms.server.test; import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.model.OmsLockDO; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository; import org.assertj.core.util.Lists; @@ -30,6 +32,8 @@ public class RepositoryTest { private JobInfoRepository jobInfoRepository; @Resource private OmsLockRepository omsLockRepository; + @Resource + private ExecuteLogRepository executeLogRepository; /** * 需要证明批量写入失败后会回滚 @@ -52,4 +56,13 @@ public class RepositoryTest { System.out.println(result); } + @Test + public void testUpdate() { + ExecuteLogDO updateEntity = new ExecuteLogDO(); + updateEntity.setId(22L); + updateEntity.setActualTriggerTime(System.currentTimeMillis()); + updateEntity.setResult("hahaha"); + executeLogRepository.saveAndFlush(updateEntity); + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index b2ce67c5..844336fb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -346,6 +346,7 @@ public class TaskTracker { req.setTotalTaskNum(finishedNum + unfinishedNum); req.setSucceedTaskNum(succeedNum); req.setFailedTaskNum(failedNum); + req.setReportTime(System.currentTimeMillis()); // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果 TaskDO resultTask = null;