develop the instance status checker and InstanceManager

This commit is contained in:
tjq 2020-04-07 17:49:12 +08:00
parent b69cb6fc14
commit a86834d9a9
15 changed files with 400 additions and 17 deletions

View File

@ -22,4 +22,13 @@ public enum InstanceStatus {
private int v;
private String des;
public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) {
if (v == is.v) {
return is;
}
}
throw new IllegalArgumentException("InstanceStatus has no item for value " + v);
}
}

View File

@ -24,4 +24,6 @@ public class TaskTrackerReportInstanceStatusReq implements Serializable {
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
private long reportTime;
}

View File

@ -0,0 +1,28 @@
package com.github.kfcfans.oms.server.common.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Spring ApplicationContext 工具类
*
* @author tjq
* @since 2020/4/7
*/
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext context;
public static <T> T getBean(Class<T> clz) {
return context.getBean(clz);
}
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
context = ctx;
}
}

View File

@ -0,0 +1,159 @@
package com.github.kfcfans.oms.server.core;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.SpringUtils;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Date;
import java.util.Map;
/**
* 管理被调度的服务
*
* @author tjq
* @since 2020/4/7
*/
@Slf4j
public class InstanceManager {
// 存储 instanceId 对应的 Job 信息便于重试
private static final Map<Long, JobInfoDO> instanceId2JobInfo = Maps.newConcurrentMap();
// 存储 instance 的状态只有状态变更才会更新数据库减轻DB压力
private static final Map<Long, InstanceStatusHolder> instanceId2StatusHolder = Maps.newConcurrentMap();
// Spring Bean
private static DispatchService dispatchService;
private static ExecuteLogRepository executeLogRepository;
private static JobInfoRepository jobInfoRepository;
/**
* 注册到任务实例管理器
* @param instanceId 即将运行的任务实例ID
* @param jobInfoDO 即将运行的任务实例对应的任务元数据
*/
public static void register(Long instanceId, JobInfoDO jobInfoDO) {
InstanceStatusHolder statusHolder = new InstanceStatusHolder();
statusHolder.setInstanceId(instanceId);
statusHolder.setInstanceStatus(InstanceStatus.WAITING_DISPATCH.getV());
instanceId2JobInfo.put(instanceId, jobInfoDO);
instanceId2StatusHolder.put(instanceId, statusHolder);
}
/**
* 更新任务状态
* @param req TaskTracker上报任务实例状态的请求
*/
public static void updateStatus(TaskTrackerReportInstanceStatusReq req) {
Long jobId = req.getJobId();
Long instanceId = req.getInstanceId();
// 不存在可能该任务实例刚经历Server变更需要重新构建基础信息
if (!instanceId2JobInfo.containsKey(instanceId)) {
log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId);
JobInfoDO JobInfoDo = getJobInfoRepository().getOne(jobId);
instanceId2JobInfo.put(instanceId, JobInfoDo);
}
// 更新本地保存的任务实例状态用于未完成任务前的详细信息查询和缓存加速
InstanceStatusHolder statusHolder = instanceId2StatusHolder.computeIfAbsent(instanceId, ignore -> new InstanceStatusHolder());
if (req.getReportTime() > statusHolder.getLastReportTime()) {
BeanUtils.copyProperties(req, statusHolder);
statusHolder.setLastReportTime(req.getReportTime());
}else {
log.warn("[InstanceManager] receive the expired status report request: {}.", req);
return;
}
InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus());
Integer timeExpressionType = instanceId2JobInfo.get(instanceId).getTimeExpressionType();
// FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可
// FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行
// 综上直接把 status runningNum 同步到DB即可
if (timeExpressionType != TimeExpressionType.CRON.getV()) {
getExecuteLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum());
return;
}
ExecuteLogDO updateEntity = getExecuteLogRepository().getOne(instanceId);
updateEntity.setStatus(newStatus.getV());
updateEntity.setGmtModified(new Date());
boolean finished = false;
if (newStatus == InstanceStatus.SUCCEED) {
updateEntity.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis());
finished = true;
log.info("[InstanceManager] instance(instanceId={}) execute succeed.", instanceId);
}else if (newStatus == InstanceStatus.FAILED) {
// 当前重试次数 < 最大重试次数进行重试
if (updateEntity.getRunningTimes() < instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) {
log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes());
getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes());
}else {
updateEntity.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis());
finished = true;
log.info("[InstanceManager] instance(instanceId={}) execute failed and have no chance to retry.", instanceId);
}
}
// 同步状态变更信息到数据库
getExecuteLogRepository().saveAndFlush(updateEntity);
// 清除已完成的实例信息
if (finished) {
instanceId2JobInfo.remove(instanceId);
instanceId2StatusHolder.remove(instanceId);
}
}
private static ExecuteLogRepository getExecuteLogRepository() {
while (executeLogRepository == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
executeLogRepository = SpringUtils.getBean(ExecuteLogRepository.class);
}
return executeLogRepository;
}
private static JobInfoRepository getJobInfoRepository() {
while (jobInfoRepository == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
}
return jobInfoRepository;
}
private static DispatchService getDispatchService() {
while (dispatchService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
dispatchService = SpringUtils.getBean(DispatchService.class);
}
return dispatchService;
}
}

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.oms.server.core;
import lombok.Data;
/**
* 保存任务实例的状态信息
*
* @author tjq
* @since 2020/4/7
*/
@Data
public class InstanceStatusHolder {
private long instanceId;
private int instanceStatus;
private String result;
/* ********* 统计信息 ********* */
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
// 上次上报时间
private long lastReportTime;
}

View File

@ -4,6 +4,7 @@ import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.core.InstanceManager;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import lombok.extern.slf4j.Slf4j;
@ -20,6 +21,7 @@ public class ServerActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(Ping.class, this::onReceivePing)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
@ -48,7 +50,11 @@ public class ServerActor extends AbstractActor {
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
private void onReceive(TaskTrackerReportInstanceStatusReq req) {
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try {
InstanceManager.updateStatus(req);
}catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req);
}
}
}

View File

@ -23,7 +23,7 @@ public class AppInfoDO {
private String appName;
private String description;
// 当前负责该 appName 旗下任务调度的server地址IP:Port
// 当前负责该 appName 旗下任务调度的server地址IP:Port注意该地址为ActorSystem地址而不是HTTP地址两者端口不同
private String currentServer;
private Date gmtCreate;

View File

@ -30,12 +30,15 @@ public class ExecuteLogDO {
private int status;
// 执行结果
private String result;
// 耗时
private Long usedTime;
// 预计触发时间
private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间
private Long finishedTime;
// 总共执行的次数CRON任务 -> 代表重试次数FREQUENT -> 代表总执行次数
private Long runningTimes;
private Date gmtCreate;
private Date gmtModified;

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import java.util.Date;
import java.util.List;
/**
@ -14,10 +15,30 @@ import java.util.List;
*/
public interface ExecuteLogRepository extends JpaRepository<ExecuteLogDO, Long> {
/**
* 统计当前JOB有多少实例正在运行
*/
long countByJobIdAndStatusIn(long jobId, List<Integer> status);
@Query(value = "update execute_log set status = ?2, result = ?3 where instance_id = ?1", nativeQuery = true)
int updateStatusAndLog(long instanceId, int status, String result);
List<ExecuteLogDO> findByJobIdIn(List<Long> jobIds);
/**
* 更新任务执行记录内容DispatchService专用
* @param instanceId 任务实例ID分布式唯一
* @param status 任务实例运行状态
* @param runningTimes 运行次数
* @param result 结果
* @return 更新数量
*/
@Query(value = "update execute_log set status = ?2, running_times = ?3, actual_trigger_time = now(), result = ?4, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4Trigger(long instanceId, int status, long runningTimes, String result);
@Query(value = "update execute_log set status = ?2, running_times = ?3, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4FrequentJob(long instanceId, int status, long runningTimes);
// 状态检查三兄弟对应 WAITING_DISPATCH WAITING_WORKER_RECEIVE RUNNING 三阶段
List<ExecuteLogDO> findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<ExecuteLogDO> findByJobIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<ExecuteLogDO> findByJobIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
}

View File

@ -20,7 +20,7 @@ import static com.github.kfcfans.common.InstanceStatus.*;
/**
* 派送服务
* 派送服务将任务从Server派发到Worker
*
* @author tjq
* @since 2020/4/5
@ -39,7 +39,7 @@ public class DispatchService {
private static final String NO_WORKER_REASON = "no worker available";
private static final String EMPTY_RESULT = "";
public void dispatch(JobInfoDO jobInfo, long instanceId) {
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
log.debug("[DispatchService] start to dispatch job -> {}.", jobInfo);
@ -50,7 +50,7 @@ public class DispatchService {
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(FAILED_REASON, runningInstanceCount);
log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount);
executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), result);
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, result);
return;
}
@ -61,7 +61,7 @@ public class DispatchService {
if (StringUtils.isEmpty(taskTrackerAddress)) {
log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo);
executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), NO_WORKER_REASON);
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, NO_WORKER_REASON);
return;
}
@ -89,8 +89,9 @@ public class DispatchService {
// 发送请求不可靠需要一个后台线程定期轮询状态
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress);
taskTrackerActor.tell(req, null);
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
// 修改状态
executeLogRepository.updateStatusAndLog(instanceId, WAITING_WORKER_RECEIVE.getV(), EMPTY_RESULT);
executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, EMPTY_RESULT);
}
}

View File

@ -0,0 +1,109 @@
package com.github.kfcfans.oms.server.service.timing;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
/**
* 定时状态检查
*
* @author tjq
* @since 2020/4/7
*/
@Slf4j
@Service
public class InstanceStatusCheckService {
private static final int MAX_BATCH_NUM = 10;
private static final long DISPATCH_TIMEOUT_MS = 10000;
private static final long RECEIVE_TIMEOUT_MS = 60000;
private static final long RUNNING_TIMEOUT_MS = 60000;
@Resource
private DispatchService dispatchService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private ExecuteLogRepository executeLogRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Scheduled(fixedRate = 10000)
public void timingStatusCheck() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
innerCheck();
}catch (Exception e) {
log.error("[InstanceStatusCheckService] status check failed.", e);
}
log.info("[InstanceStatusCheckService] status check using {}.", stopwatch.stop());
}
private void innerCheck() {
// 查询DB获取该Server需要负责的AppGroup
List<AppInfoDO> appInfoList = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress());
if (CollectionUtils.isEmpty(appInfoList)) {
log.info("[InstanceStatusCheckService] current server has no app's job to check");
return;
}
List<Long> allAppIds = appInfoList.stream().map(AppInfoDO::getId).collect(Collectors.toList());
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
// 1. 检查等待 WAITING_DISPATCH 状态的任务
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
List<ExecuteLogDO> waitingDispatchInstances = executeLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances);
waitingDispatchInstances.forEach(instance -> {
// 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.getOne(instance.getJobId());
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0);
});
}
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
List<ExecuteLogDO> waitingWorkerReceiveInstances = executeLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) did nt receive any reply from worker.", waitingWorkerReceiveInstances);
waitingWorkerReceiveInstances.forEach(instance -> {
// 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.getOne(instance.getJobId());
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0);
});
}
// 3. 检查 RUNNING 状态的任务一定时间没收到 TaskTracker 的状态报告视为失败
threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
List<ExecuteLogDO> failedInstances = executeLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
if (!CollectionUtils.isEmpty(failedInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
failedInstances.forEach(instance -> {
// 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.getOne(instance.getJobId());
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
});
}
});
}
}

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.service.schedule;
package com.github.kfcfans.oms.server.service.timing.schedule;
import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer;

View File

@ -1,9 +1,10 @@
package com.github.kfcfans.oms.server.service.schedule;
package com.github.kfcfans.oms.server.service.timing.schedule;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.core.InstanceManager;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
@ -68,6 +69,7 @@ public class JobScheduleService {
}
List<Long> allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList());
// 调度 CRON 表达式 JOB
try {
scheduleCornJob(allAppIds);
}catch (Exception e) {
@ -76,6 +78,7 @@ public class JobScheduleService {
log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch);
stopwatch.reset().start();
// 调度 FIX_RATE FIX_DELAY JOB
try {
scheduleFrequentJob(allAppIds);
}catch (Exception e) {
@ -132,6 +135,10 @@ public class JobScheduleService {
// 2. 推入时间轮中等待调度执行
jobInfos.forEach(jobInfoDO -> {
Long instanceId = jobId2InstanceId.get(jobInfoDO.getId());
// 注册到任务实例管理中心
InstanceManager.register(instanceId, jobInfoDO);
long targetTriggerTime = jobInfoDO.getNextTriggerTime();
long delay = 0;
if (targetTriggerTime < nowTime) {
@ -141,9 +148,8 @@ public class JobScheduleService {
}
HashedWheelTimerHolder.TIMER.schedule(() -> {
dispatchService.dispatch(jobInfoDO, jobId2InstanceId.get(jobInfoDO.getId()));
dispatchService.dispatch(jobInfoDO, instanceId, 0);
}, delay, TimeUnit.MILLISECONDS);
});
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms

View File

@ -3,8 +3,10 @@ package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import org.assertj.core.util.Lists;
@ -30,6 +32,8 @@ public class RepositoryTest {
private JobInfoRepository jobInfoRepository;
@Resource
private OmsLockRepository omsLockRepository;
@Resource
private ExecuteLogRepository executeLogRepository;
/**
* 需要证明批量写入失败后会回滚
@ -52,4 +56,13 @@ public class RepositoryTest {
System.out.println(result);
}
@Test
public void testUpdate() {
ExecuteLogDO updateEntity = new ExecuteLogDO();
updateEntity.setId(22L);
updateEntity.setActualTriggerTime(System.currentTimeMillis());
updateEntity.setResult("hahaha");
executeLogRepository.saveAndFlush(updateEntity);
}
}

View File

@ -346,6 +346,7 @@ public class TaskTracker {
req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(succeedNum);
req.setFailedTaskNum(failedNum);
req.setReportTime(System.currentTimeMillis());
// 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果
TaskDO resultTask = null;