[fix] issue#12 (mapreduce processor stuck when there exists multi TaskTracker)

This commit is contained in:
tjq 2020-06-16 14:16:28 +08:00
parent 69bbd9a5f8
commit 40682bbd34
18 changed files with 254 additions and 227 deletions

View File

@ -31,6 +31,8 @@ import java.util.Optional;
@Slf4j @Slf4j
public class ServerActor extends AbstractActor { public class ServerActor extends AbstractActor {
private InstanceManager instanceManager;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -57,7 +59,7 @@ public class ServerActor extends AbstractActor {
*/ */
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try { try {
InstanceManager.updateStatus(req); getInstanceManager().updateStatus(req);
// 结束状态成功/失败需要回复消息 // 结束状态成功/失败需要回复消息
if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) {
@ -105,4 +107,12 @@ public class ServerActor extends AbstractActor {
getSender().tell(askResponse, getSelf()); getSender().tell(askResponse, getSelf());
} }
// 不需要加锁 Spring IOC 中重复取并没什么问题
private InstanceManager getInstanceManager() {
if (instanceManager == null) {
instanceManager = SpringUtils.getBean(InstanceManager.class);
}
return instanceManager;
}
} }

View File

@ -54,6 +54,8 @@ public class InstanceInfoDO {
private Long actualTriggerTime; private Long actualTriggerTime;
// 结束时间 // 结束时间
private Long finishedTime; private Long finishedTime;
// 最后上报时间
private Long lastReportTime;
// TaskTracker地址 // TaskTracker地址
private String taskTrackerAddress; private String taskTrackerAddress;

View File

@ -36,6 +36,8 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.*;
@Service @Service
public class DispatchService { public class DispatchService {
@Resource
private InstanceManager instanceManager;
@Resource @Resource
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
@ -71,7 +73,7 @@ public class DispatchService {
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
return; return;
} }
@ -96,7 +98,7 @@ public class DispatchService {
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now);
InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
return; return;
} }
@ -107,9 +109,6 @@ public class DispatchService {
} }
} }
// 注册到任务实例管理中心
InstanceManager.register(instanceId, jobInfo);
// 构造请求 // 构造请求
ServerScheduleJobReq req = new ServerScheduleJobReq(); ServerScheduleJobReq req = new ServerScheduleJobReq();
BeanUtils.copyProperties(jobInfo, req); BeanUtils.copyProperties(jobInfo, req);

View File

@ -45,6 +45,8 @@ import java.util.stream.Stream;
@Service @Service
public class InstanceLogService { public class InstanceLogService {
@Resource
private InstanceManager instanceManager;
@Resource @Resource
private GridFsManager gridFsManager; private GridFsManager gridFsManager;
// 本地数据库操作bean // 本地数据库操作bean
@ -195,8 +197,8 @@ public class InstanceLogService {
} }
// 删除本地数据库数据 // 删除本地数据库数据
try { try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
instanceId2LastReportTime.remove(instanceId); instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
} }
@ -315,10 +317,10 @@ public class InstanceLogService {
@Scheduled(fixedDelay = 60000) @Scheduled(fixedDelay = 60000)
public void timingCheck() { public void timingCheck() {
// 1. 定时删除秒级任务的日志 // 定时删除秒级任务的日志
List<Long> frequentInstanceIds = Lists.newLinkedList(); List<Long> frequentInstanceIds = Lists.newLinkedList();
instanceId2LastReportTime.keySet().forEach(instanceId -> { instanceId2LastReportTime.keySet().forEach(instanceId -> {
JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId); JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId);
if (jobInfo == null) { if (jobInfo == null) {
return; return;
} }
@ -340,7 +342,7 @@ public class InstanceLogService {
}); });
} }
// 2. 删除长时间未 REPORT 的日志 // 删除长时间未 REPORT 的日志必要性考证中......
} }

View File

@ -16,13 +16,15 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import com.google.common.collect.Maps; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,114 +35,112 @@ import java.util.concurrent.TimeUnit;
* @since 2020/4/7 * @since 2020/4/7
*/ */
@Slf4j @Slf4j
@Service
public class InstanceManager { public class InstanceManager {
// 存储 instanceId 对应的 Job 信息便于重试 // 存储 instanceId 对应的 Job 信息便于重试
private static Map<Long, JobInfoDO> instanceId2JobInfo = Maps.newConcurrentMap(); private static Cache<Long, JobInfoDO> instanceId2JobInfo;
// 存储 instance 的状态暂时只用到了 lastReportTime
private static Map<Long, InstanceStatusHolder> instanceId2StatusHolder = Maps.newConcurrentMap();
// Spring Bean // Spring Bean
private static DispatchService dispatchService; @Resource
private static InstanceLogService instanceLogService; private DispatchService dispatchService;
private static InstanceInfoRepository instanceInfoRepository; @Resource
private static JobInfoRepository jobInfoRepository; private InstanceLogService instanceLogService;
private static Alarmable omsCenterAlarmService; @Resource(name = "omsCenterAlarmService")
private static WorkflowInstanceManager workflowInstanceManager; private Alarmable omsCenterAlarmService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
/** private static final int CACHE_CONCURRENCY_LEVEL = 8;
* 注册到任务实例管理器 private static final int CACHE_MAX_SIZE = 4096;
* @param instanceId 即将运行的任务实例ID
* @param jobInfoDO 即将运行的任务实例对应的任务元数据
*/
public static void register(Long instanceId, JobInfoDO jobInfoDO) {
InstanceStatusHolder statusHolder = new InstanceStatusHolder(); static {
statusHolder.setInstanceId(instanceId); instanceId2JobInfo = CacheBuilder.newBuilder()
statusHolder.setInstanceStatus(InstanceStatus.WAITING_DISPATCH.getV()); .concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.maximumSize(CACHE_MAX_SIZE)
instanceId2JobInfo.put(instanceId, jobInfoDO); .build();
instanceId2StatusHolder.put(instanceId, statusHolder);
} }
/** /**
* 更新任务状态 * 更新任务状态
* @param req TaskTracker上报任务实例状态的请求 * @param req TaskTracker上报任务实例状态的请求
*/ */
public static void updateStatus(TaskTrackerReportInstanceStatusReq req) { public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception {
Long jobId = req.getJobId(); Long jobId = req.getJobId();
Long instanceId = req.getInstanceId(); Long instanceId = req.getInstanceId();
// 不存在可能该任务实例刚经历Server变更需要重新构建基础信息 // 获取相关数据
if (!instanceId2JobInfo.containsKey(instanceId)) { JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> {
log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId); Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId));
});
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
Optional<JobInfoDO> jobInfoDOOptional = getJobInfoRepository().findById(jobId); // 丢弃过期的上报数据
if (jobInfoDOOptional.isPresent()) { if (req.getReportTime() <= instanceInfo.getLastReportTime()) {
JobInfoDO JobInfoDo = jobInfoDOOptional.get(); log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will br dropped.", instanceId, req);
instanceId2JobInfo.put(instanceId, JobInfoDo); return;
}else {
throw new IllegalArgumentException("can't find JobIno by jobId:" + jobId);
}
} }
// 更新本地保存的任务实例状态用于未完成任务前的详细信息查询和缓存加速 // 丢弃非目标 TaskTracker 的上报数据脑裂情况
InstanceStatusHolder statusHolder = instanceId2StatusHolder.computeIfAbsent(instanceId, ignore -> new InstanceStatusHolder()); if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {
if (req.getReportTime() > statusHolder.getLastReportTime()) { log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress());
BeanUtils.copyProperties(req, statusHolder);
statusHolder.setLastReportTime(req.getReportTime());
}else {
log.warn("[InstanceManager] receive the expired status report request: {}.", req);
return; return;
} }
InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus()); InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus());
Integer timeExpressionType = instanceId2JobInfo.get(instanceId).getTimeExpressionType(); Integer timeExpressionType = jobInfo.getTimeExpressionType();
instanceInfo.setStatus(newStatus.getV());
instanceInfo.setLastReportTime(req.getReportTime());
instanceInfo.setGmtModified(new Date());
// FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可 // FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可
// FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行 // FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行
// 综上直接把 status runningNum 同步到DB即可 // 综上直接把 status runningNum 同步到DB即可
if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) { if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
getInstanceInfoRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum(), new Date());
instanceInfo.setRunningTimes(req.getTotalTaskNum());
instanceInfoRepository.saveAndFlush(instanceInfo);
return; return;
} }
InstanceInfoDO updateEntity = getInstanceInfoRepository().findByInstanceId(instanceId);
updateEntity.setStatus(newStatus.getV());
updateEntity.setGmtModified(new Date());
boolean finished = false; boolean finished = false;
if (newStatus == InstanceStatus.SUCCEED) { if (newStatus == InstanceStatus.SUCCEED) {
updateEntity.setResult(req.getResult()); instanceInfo.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis()); instanceInfo.setFinishedTime(System.currentTimeMillis());
finished = true; finished = true;
}else if (newStatus == InstanceStatus.FAILED) { }else if (newStatus == InstanceStatus.FAILED) {
// 当前重试次数 <= 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 = // 当前重试次数 <= 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 =
if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) {
log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes()); log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes());
// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败 // 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败
HashedWheelTimerHolder.TIMER.schedule(() -> { HashedWheelTimerHolder.TIMER.schedule(() -> {
getDispatchService().redispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}, 10, TimeUnit.SECONDS); }, 10, TimeUnit.SECONDS);
// 修改状态为 等待派发正式开始重试 // 修改状态为 等待派发正式开始重试
// 问题会丢失以往的调度记录actualTriggerTime什么的都会被覆盖 // 问题会丢失以往的调度记录actualTriggerTime什么的都会被覆盖
updateEntity.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
}else { }else {
updateEntity.setResult(req.getResult()); instanceInfo.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis()); instanceInfo.setFinishedTime(System.currentTimeMillis());
finished = true; finished = true;
log.info("[InstanceManager] instance(instanceId={}) execute failed and have no chance to retry.", instanceId); log.info("[InstanceManager-{}] instance execute failed and have no chance to retry.", instanceId);
} }
} }
// 同步状态变更信息到数据库 // 同步状态变更信息到数据库
getInstanceInfoRepository().saveAndFlush(updateEntity); instanceInfoRepository.saveAndFlush(instanceInfo);
if (finished) { if (finished) {
// 这里的 InstanceStatus 只有 成功/失败 两种手动停止不会由 TaskTracker 上报 // 这里的 InstanceStatus 只有 成功/失败 两种手动停止不会由 TaskTracker 上报
@ -155,53 +155,53 @@ public class InstanceManager {
* @param status 任务状态 成功/失败/手动停止 * @param status 任务状态 成功/失败/手动停止
* @param result 执行结果 * @param result 执行结果
*/ */
public static void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) { public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
// 清除已完成的实例信息
instanceId2StatusHolder.remove(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO
JobInfoDO jobInfo = instanceId2JobInfo.remove(instanceId);
// 上报日志数据 // 上报日志数据
getInstanceLogService().sync(instanceId); instanceLogService.sync(instanceId);
// workflow 特殊处理 // workflow 特殊处理
if (wfInstanceId != null) { if (wfInstanceId != null) {
// 手动停止在工作流中也认为是失败理论上不应该发生 // 手动停止在工作流中也认为是失败理论上不应该发生
getWorkflowInstanceManager().move(wfInstanceId, instanceId, status, result); workflowInstanceManager.move(wfInstanceId, instanceId, status, result);
} }
// 告警 // 告警
if (status == InstanceStatus.FAILED) { if (status == InstanceStatus.FAILED) {
JobInfoDO jobInfo = fetchJobInfo(instanceId);
if (jobInfo == null) {
jobInfo = fetchJobInfo(instanceId);
}
if (jobInfo == null) { if (jobInfo == null) {
log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId); log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId);
return; return;
} }
InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
JobInstanceAlarmContent content = new JobInstanceAlarmContent(); JobInstanceAlarmContent content = new JobInstanceAlarmContent();
BeanUtils.copyProperties(jobInfo, content); BeanUtils.copyProperties(jobInfo, content);
BeanUtils.copyProperties(instanceInfo, content); BeanUtils.copyProperties(instanceInfo, content);
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
getAlarmService().onJobInstanceFailed(content, userList); omsCenterAlarmService.onJobInstanceFailed(content, userList);
} }
// 过期缓存
instanceId2JobInfo.invalidate(instanceId);
} }
public static JobInfoDO fetchJobInfo(Long instanceId) { /**
JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId); * 根据任务实例ID查询任务相关信息
* @param instanceId 任务实例ID
* @return 任务元数据
*/
public JobInfoDO fetchJobInfo(Long instanceId) {
JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId);
if (jobInfo != null) { if (jobInfo != null) {
return jobInfo; return jobInfo;
} }
InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo != null) { if (instanceInfo != null) {
return getJobInfoRepository().findById(instanceInfo.getJobId()).orElse(null); return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null);
} }
return null; return null;
} }
@ -209,74 +209,7 @@ public class InstanceManager {
/** /**
* 释放本地缓存防止内存泄漏 * 释放本地缓存防止内存泄漏
*/ */
public static void releaseInstanceInfos() { public static void releaseCache() {
instanceId2JobInfo = Maps.newConcurrentMap(); instanceId2JobInfo.cleanUp();
instanceId2StatusHolder = Maps.newConcurrentMap();
}
private static InstanceInfoRepository getInstanceInfoRepository() {
while (instanceInfoRepository == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class);
}
return instanceInfoRepository;
}
private static JobInfoRepository getJobInfoRepository() {
while (jobInfoRepository == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
}
return jobInfoRepository;
}
private static DispatchService getDispatchService() {
while (dispatchService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
dispatchService = SpringUtils.getBean(DispatchService.class);
}
return dispatchService;
}
private static InstanceLogService getInstanceLogService() {
while (instanceLogService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
instanceLogService = SpringUtils.getBean(InstanceLogService.class);
}
return instanceLogService;
}
private static Alarmable getAlarmService() {
while (omsCenterAlarmService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
omsCenterAlarmService = (Alarmable) SpringUtils.getBean("omsCenterAlarmService");
}
return omsCenterAlarmService;
}
private static WorkflowInstanceManager getWorkflowInstanceManager() {
while (workflowInstanceManager == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
workflowInstanceManager = SpringUtils.getBean(WorkflowInstanceManager.class);
}
return workflowInstanceManager;
} }
} }

View File

@ -41,6 +41,8 @@ public class InstanceService {
@Resource @Resource
private IdGenerateService idGenerateService; private IdGenerateService idGenerateService;
@Resource @Resource
private InstanceManager instanceManager;
@Resource
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
/** /**
@ -67,6 +69,7 @@ public class InstanceService {
newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
newInstanceInfo.setExpectedTriggerTime(expectTriggerTime); newInstanceInfo.setExpectedTriggerTime(expectTriggerTime);
newInstanceInfo.setLastReportTime(-1L);
newInstanceInfo.setGmtCreate(now); newInstanceInfo.setGmtCreate(now);
newInstanceInfo.setGmtModified(now); newInstanceInfo.setGmtModified(now);
@ -101,7 +104,7 @@ public class InstanceService {
instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceInfoRepository.saveAndFlush(instanceInfo); instanceInfoRepository.saveAndFlush(instanceInfo);
InstanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER); instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);
/* /*
不可靠通知停止 TaskTracker 不可靠通知停止 TaskTracker

View File

@ -1,29 +0,0 @@
package com.github.kfcfans.powerjob.server.service.instance;
import lombok.Data;
/**
* 保存任务实例的状态信息
*
* @author tjq
* @since 2020/4/7
*/
@Data
public class InstanceStatusHolder {
private long instanceId;
private int instanceStatus;
private String result;
/* ********* 统计信息 ********* */
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
// 任务开始时间
private long startTime;
// 上次上报时间
private long lastReportTime;
// 源地址TaskTracker 地址
private String sourceAddress;
}

View File

@ -60,7 +60,7 @@ public class CleanService {
// 释放本地缓存 // 释放本地缓存
WorkerManagerService.releaseContainerInfos(); WorkerManagerService.releaseContainerInfos();
InstanceManager.releaseInstanceInfos(); InstanceManager.releaseCache();
// 删除数据库运行记录 // 删除数据库运行记录
cleanInstanceLog(); cleanInstanceLog();

View File

@ -44,6 +44,8 @@ public class InstanceStatusCheckService {
@Resource @Resource
private DispatchService dispatchService; private DispatchService dispatchService;
@Resource @Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager; private WorkflowInstanceManager workflowInstanceManager;
@Resource @Resource
@ -190,6 +192,6 @@ public class InstanceStatusCheckService {
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
instanceInfoRepository.saveAndFlush(instance); instanceInfoRepository.saveAndFlush(instance);
InstanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!"); instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!");
} }
} }

View File

@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
@ -169,10 +170,8 @@ public class OmsScheduleService {
jobInfos.forEach(jobInfoDO -> { jobInfos.forEach(jobInfoDO -> {
try { try {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date benchmarkTime = new Date(jobInfoDO.getNextTriggerTime()); Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression());
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(benchmarkTime);
JobInfoDO updatedJobInfo = new JobInfoDO(); JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
@ -221,8 +220,7 @@ public class OmsScheduleService {
// 3. 重新计算下一次调度时间并更新 // 3. 重新计算下一次调度时间并更新
try { try {
CronExpression cronExpression = new CronExpression(wfInfo.getTimeExpression()); Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(new Date(wfInfo.getNextTriggerTime()));
WorkflowInfoDO updateEntity = new WorkflowInfoDO(); WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity); BeanUtils.copyProperties(wfInfo, updateEntity);
@ -267,4 +265,18 @@ public class OmsScheduleService {
}); });
} }
/**
* 计算下次触发时间
* @param preTriggerTime 前一次触发时间
* @param cronExpression CRON 表达式
* @return 下一次调度时间
* @throws Exception 异常
*/
private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws Exception {
CronExpression ce = new CronExpression(cronExpression);
// 取最大值防止长时间未调度任务被连续调度原来DISABLE的任务突然被打开不取最大值会补上过去所有的调度
long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime);
return ce.getNextValidTimeAfter(new Date(benchmarkTime));
}
} }

View File

@ -7,6 +7,9 @@ import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
/** /**
* 普通计算节点处理来自 TaskTracker 的请求 * 普通计算节点处理来自 TaskTracker 的请求
@ -28,13 +31,14 @@ public class ProcessorTrackerActor extends AbstractActor {
/** /**
* 处理来自TaskTracker的task执行请求 * 处理来自TaskTracker的task执行请求
* @param req 请求
*/ */
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
Long instanceId = req.getInstanceInfo().getInstanceId(); Long instanceId = req.getInstanceInfo().getInstanceId();
// 创建 ProcessorTracker 一定能成功且每个任务实例只会创建一个 ProcessorTracker // 创建 ProcessorTracker 一定能成功
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req)); ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req));
TaskDO task = new TaskDO(); TaskDO task = new TaskDO();
@ -47,14 +51,18 @@ public class ProcessorTrackerActor extends AbstractActor {
processorTracker.submitTask(task); processorTracker.submitTask(task);
} }
/**
* 处理来自TaskTracker停止任务的请求
* @param req 请求
*/
private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) {
Long instanceId = req.getInstanceId(); Long instanceId = req.getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId); List<ProcessorTracker> removedPts = ProcessorTrackerPool.removeProcessorTracker(instanceId);
if (processorTracker == null) { if (CollectionUtils.isEmpty(removedPts)) {
log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId); log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId);
}else { }else {
processorTracker.destroy(); removedPts.forEach(ProcessorTracker::destroy);
} }
} }
} }

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
@ -14,6 +15,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import javafx.concurrent.Task;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
@ -47,12 +49,20 @@ public class TaskTrackerActor extends AbstractActor {
*/ */
private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {
int taskStatus = req.getStatus();
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
// 结束状态需要回复接受成功
if (TaskStatus.finishedStatus.contains(taskStatus)) {
AskResponse askResponse = AskResponse.succeed(null);
getSender().tell(askResponse, getSelf());
}
// 手动停止 TaskTracker 的情况下会出现这种情况 // 手动停止 TaskTracker 的情况下会出现这种情况
if (taskTracker == null) { if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
} else { } else {
taskTracker.updateTaskStatus(req.getTaskId(), req.getStatus(), req.getReportTime(), req.getResult()); taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
} }
} }

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.worker.common.constants; package com.github.kfcfans.powerjob.worker.common.constants;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import java.util.List;
/** /**
* 任务状态task_info 表中 status 字段的枚举值 * 任务状态task_info 表中 status 字段的枚举值
* *
@ -20,6 +23,8 @@ public enum TaskStatus {
WORKER_PROCESS_FAILED(5, "worker执行失败"), WORKER_PROCESS_FAILED(5, "worker执行失败"),
WORKER_PROCESS_SUCCESS(6, "worker执行成功"); WORKER_PROCESS_SUCCESS(6, "worker执行成功");
public static final List<Integer> finishedStatus = Lists.newArrayList(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value);
private int value; private int value;
private String des; private String des;

View File

@ -1,15 +1,24 @@
package com.github.kfcfans.powerjob.worker.common.utils; package com.github.kfcfans.powerjob.worker.common.utils;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/** /**
* AKKA 工具类 * AKKA 工具类
* *
* @author tjq * @author tjq
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j
public class AkkaUtils { public class AkkaUtils {
/** /**
@ -28,4 +37,21 @@ public class AkkaUtils {
return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
} }
/**
* 可靠传输
* @param remote 远程 AKKA 节点
* @param msg 需要传输的对象
* @return true: 对方接收成功 / false: 对方接收失败可能传输成功但对方处理失败需要协同处理 AskResponse 返回值
*/
public static boolean reliableTransmit(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return response.isSuccess();
}catch (Exception e) {
log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString());
}
return false;
}
} }

View File

@ -1,11 +1,15 @@
package com.github.kfcfans.powerjob.worker.core.executor; package com.github.kfcfans.powerjob.worker.core.executor;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.log.OmsLogger;
@ -25,7 +29,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/** /**
* Processor 执行器 * Processor 执行器
@ -45,6 +53,8 @@ public class ProcessorRunnable implements Runnable {
private final OmsLogger omsLogger; private final OmsLogger omsLogger;
// 类加载器 // 类加载器
private final ClassLoader classLoader; private final ClassLoader classLoader;
// 重试队列ProcessorTracker 将会定期重新上报处理结果
private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
public void innerRun() throws InterruptedException { public void innerRun() throws InterruptedException {
@ -174,7 +184,17 @@ public class ProcessorRunnable implements Runnable {
req.setResult(result); req.setResult(result);
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
taskTrackerActor.tell(req, null); // 最终结束状态要求可靠发送
if (TaskStatus.finishedStatus.contains(status.getValue())) {
boolean success = AkkaUtils.reliableTransmit(taskTrackerActor, req);
if (!success) {
// 插入重试队列等待重试
statusReportRetryQueue.add(req);
log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed.", task.getInstanceId(), task.getTaskId(), status, result);
}
}else {
taskTrackerActor.tell(req, null);
}
} }
@Override @Override
@ -185,7 +205,8 @@ public class ProcessorRunnable implements Runnable {
innerRun(); innerRun();
}catch (InterruptedException ignore) { }catch (InterruptedException ignore) {
}catch (Throwable e) { }catch (Throwable e) {
log.error("[ProcessorRunnable-{}] execute failed, please fix this bug @tjq!", task.getInstanceId(), e); reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString());
log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);
}finally { }finally {
ThreadLocalStore.clear(); ThreadLocalStore.clear();
} }

View File

@ -1,9 +1,6 @@
package com.github.kfcfans.powerjob.worker.core.processor.sdk; package com.github.kfcfans.powerjob.worker.core.processor.sdk;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
@ -14,10 +11,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/** /**
* Map 处理器允许开发者自定义拆分任务进行分布式执行 * Map 处理器允许开发者自定义拆分任务进行分布式执行
@ -53,20 +47,13 @@ public abstract class MapProcessor implements BasicProcessor {
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常 // 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
boolean requestSucceed = false; String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
try { boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req);
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS));
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
requestSucceed = respObj.isSuccess();
}catch (Exception e) {
log.warn("[MapProcessor] map failed, exception is {}.", e.toString());
}
if (requestSucceed) { if (requestSucceed) {
return new ProcessResult(true, "MAP_SUCCESS"); return new ProcessResult(true, "MAP_SUCCESS");
}else { }else {
log.warn("[MapProcessor] map failed for {}", taskName);
return new ProcessResult(false, "MAP_FAILED"); return new ProcessResult(false, "MAP_FAILED");
} }
} }

View File

@ -21,11 +21,13 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
@ -50,6 +52,8 @@ public class ProcessorTracker {
private OmsContainer omsContainer; private OmsContainer omsContainer;
// 在线日志 // 在线日志
private OmsLogger omsLogger; private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
private String taskTrackerAddress; private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef; private ActorSelection taskTrackerActorRef;
@ -77,6 +81,7 @@ public class ProcessorTracker {
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
this.omsLogger = new OmsServerLogger(instanceId); this.omsLogger = new OmsServerLogger(instanceId);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
// 初始化 线程池TimingPool 启动的任务会检查 ThreadPool所以必须先初始化线程池否则NPE // 初始化 线程池TimingPool 启动的任务会检查 ThreadPool所以必须先初始化线程池否则NPE
initThreadPool(); initThreadPool();
@ -119,7 +124,7 @@ public class ProcessorTracker {
newTask.setAddress(taskTrackerAddress); newTask.setAddress(taskTrackerAddress);
ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader(); ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader();
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader); ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue);
try { try {
threadPool.submit(processorRunnable); threadPool.submit(processorRunnable);
success = true; success = true;
@ -165,6 +170,7 @@ public class ProcessorTracker {
// 2. 去除顶层引用送入GC世界 // 2. 去除顶层引用送入GC世界
taskTrackerActorRef = null; taskTrackerActorRef = null;
statusReportRetryQueue.clear();
ProcessorTrackerPool.removeProcessorTracker(instanceId); ProcessorTrackerPool.removeProcessorTracker(instanceId);
log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId); log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId);
@ -224,6 +230,19 @@ public class ProcessorTracker {
} }
} }
// 上报状态之前先重新发送失败的任务只要有结果堆积就不上报状态 PT 认为该 TT 失联然后重试相关任务
while (!statusReportRetryQueue.isEmpty()) {
ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
if (req != null) {
req.setReportTime(System.currentTimeMillis());
if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) {
statusReportRetryQueue.add(req);
return;
}
}
}
// 上报当前 ProcessorTracker 负载
long waitingNum = threadPool.getQueue().size(); long waitingNum = threadPool.getQueue().size();
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum); ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
taskTrackerActorRef.tell(req, null); taskTrackerActorRef.tell(req, null);

View File

@ -1,9 +1,13 @@
package com.github.kfcfans.powerjob.worker.core.tracker.processor; package com.github.kfcfans.powerjob.worker.core.tracker.processor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
/** /**
* 持有 Processor 对象 * 持有 Processor 对象
@ -14,23 +18,36 @@ import java.util.function.Function;
*/ */
public class ProcessorTrackerPool { public class ProcessorTrackerPool {
private static final Map<Long, ProcessorTracker> instanceId2ProcessorTracker = Maps.newConcurrentMap(); // instanceId -> (TaskTrackerAddress -> ProcessorTracker)
// 处理脑裂情况下同一个 Instance 存在多个 TaskTracker 的情况
private static final Map<Long, Map<String, ProcessorTracker>> processorTrackerPool = Maps.newHashMap();
/** /**
* 获取 ProcessorTracker如果不存在则创建 * 获取 ProcessorTracker如果不存在则创建
*/ */
public static ProcessorTracker getProcessorTracker(Long instanceId, Function<Long, ProcessorTracker> creator) { public static ProcessorTracker getProcessorTracker(Long instanceId, String address, Supplier<ProcessorTracker> creator) {
return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator);
ProcessorTracker processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address);
if (processorTracker == null) {
synchronized (ProcessorTrackerPool.class) {
processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address);
if (processorTracker == null) {
processorTracker = creator.get();
processorTrackerPool.computeIfAbsent(instanceId, ignore -> Maps.newHashMap()).put(address, processorTracker);
}
}
}
return processorTracker;
} }
/** public static List<ProcessorTracker> removeProcessorTracker(Long instanceId) {
* 获取 ProcessorTracker
*/
public static ProcessorTracker getProcessorTracker(Long instanceId) {
return instanceId2ProcessorTracker.get(instanceId);
}
public static void removeProcessorTracker(Long instanceId) { List<ProcessorTracker> res = Lists.newLinkedList();
instanceId2ProcessorTracker.remove(instanceId); Map<String, ProcessorTracker> ttAddress2Pt = processorTrackerPool.remove(instanceId);
if (ttAddress2Pt != null) {
res.addAll(ttAddress2Pt.values());
ttAddress2Pt.clear();
}
return res;
} }
} }