diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index 6c99e680..ba5b876c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -31,6 +31,8 @@ import java.util.Optional; @Slf4j public class ServerActor extends AbstractActor { + private InstanceManager instanceManager; + @Override public Receive createReceive() { return receiveBuilder() @@ -57,7 +59,7 @@ public class ServerActor extends AbstractActor { */ private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { try { - InstanceManager.updateStatus(req); + getInstanceManager().updateStatus(req); // 结束状态(成功/失败)需要回复消息 if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { @@ -105,4 +107,12 @@ public class ServerActor extends AbstractActor { getSender().tell(askResponse, getSelf()); } + + // 不需要加锁,从 Spring IOC 中重复取并没什么问题 + private InstanceManager getInstanceManager() { + if (instanceManager == null) { + instanceManager = SpringUtils.getBean(InstanceManager.class); + } + return instanceManager; + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index 39f8d0a6..f06a4e50 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -54,6 +54,8 @@ public class InstanceInfoDO { private Long actualTriggerTime; // 结束时间 private Long finishedTime; + // 最后上报时间 + private Long lastReportTime; // TaskTracker地址 private String taskTrackerAddress; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index f5c60e1f..1f94bd77 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -36,6 +36,8 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.*; @Service public class DispatchService { + @Resource + private InstanceManager instanceManager; @Resource private InstanceInfoRepository instanceInfoRepository; @@ -71,7 +73,7 @@ public class DispatchService { log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); - InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); return; } @@ -96,7 +98,7 @@ public class DispatchService { log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now); - InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } @@ -107,9 +109,6 @@ public class DispatchService { } } - // 注册到任务实例管理中心 - InstanceManager.register(instanceId, jobInfo); - // 构造请求 ServerScheduleJobReq req = new ServerScheduleJobReq(); BeanUtils.copyProperties(jobInfo, req); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index 3db9d990..da3c03d5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -45,6 +45,8 @@ import java.util.stream.Stream; @Service public class InstanceLogService { + @Resource + private InstanceManager instanceManager; @Resource private GridFsManager gridFsManager; // 本地数据库操作bean @@ -195,8 +197,8 @@ public class InstanceLogService { } // 删除本地数据库数据 try { - CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); instanceId2LastReportTime.remove(instanceId); + CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); }catch (Exception e) { log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } @@ -315,10 +317,10 @@ public class InstanceLogService { @Scheduled(fixedDelay = 60000) public void timingCheck() { - // 1. 定时删除秒级任务的日志 + // 定时删除秒级任务的日志 List frequentInstanceIds = Lists.newLinkedList(); instanceId2LastReportTime.keySet().forEach(instanceId -> { - JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId); + JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId); if (jobInfo == null) { return; } @@ -340,7 +342,7 @@ public class InstanceLogService { }); } - // 2. 删除长时间未 REPORT 的日志 + // 删除长时间未 REPORT 的日志(必要性考证中......) } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index 522cb263..cb386261 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -16,13 +16,15 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; -import com.google.common.collect.Maps; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -33,114 +35,112 @@ import java.util.concurrent.TimeUnit; * @since 2020/4/7 */ @Slf4j +@Service public class InstanceManager { // 存储 instanceId 对应的 Job 信息,便于重试 - private static Map instanceId2JobInfo = Maps.newConcurrentMap(); - // 存储 instance 的状态(暂时只用到了 lastReportTime) - private static Map instanceId2StatusHolder = Maps.newConcurrentMap(); + private static Cache instanceId2JobInfo; // Spring Bean - private static DispatchService dispatchService; - private static InstanceLogService instanceLogService; - private static InstanceInfoRepository instanceInfoRepository; - private static JobInfoRepository jobInfoRepository; - private static Alarmable omsCenterAlarmService; - private static WorkflowInstanceManager workflowInstanceManager; + @Resource + private DispatchService dispatchService; + @Resource + private InstanceLogService instanceLogService; + @Resource(name = "omsCenterAlarmService") + private Alarmable omsCenterAlarmService; + @Resource + private InstanceInfoRepository instanceInfoRepository; + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private WorkflowInstanceManager workflowInstanceManager; - /** - * 注册到任务实例管理器 - * @param instanceId 即将运行的任务实例ID - * @param jobInfoDO 即将运行的任务实例对应的任务元数据 - */ - public static void register(Long instanceId, JobInfoDO jobInfoDO) { + private static final int CACHE_CONCURRENCY_LEVEL = 8; + private static final int CACHE_MAX_SIZE = 4096; - InstanceStatusHolder statusHolder = new InstanceStatusHolder(); - statusHolder.setInstanceId(instanceId); - statusHolder.setInstanceStatus(InstanceStatus.WAITING_DISPATCH.getV()); - - instanceId2JobInfo.put(instanceId, jobInfoDO); - instanceId2StatusHolder.put(instanceId, statusHolder); + static { + instanceId2JobInfo = CacheBuilder.newBuilder() + .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) + .maximumSize(CACHE_MAX_SIZE) + .build(); } /** * 更新任务状态 * @param req TaskTracker上报任务实例状态的请求 */ - public static void updateStatus(TaskTrackerReportInstanceStatusReq req) { + public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception { Long jobId = req.getJobId(); Long instanceId = req.getInstanceId(); - // 不存在,可能该任务实例刚经历Server变更,需要重新构建基础信息 - if (!instanceId2JobInfo.containsKey(instanceId)) { - log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId); + // 获取相关数据 + JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> { + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId)); + }); + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - Optional jobInfoDOOptional = getJobInfoRepository().findById(jobId); - if (jobInfoDOOptional.isPresent()) { - JobInfoDO JobInfoDo = jobInfoDOOptional.get(); - instanceId2JobInfo.put(instanceId, JobInfoDo); - }else { - throw new IllegalArgumentException("can't find JobIno by jobId:" + jobId); - } + // 丢弃过期的上报数据 + if (req.getReportTime() <= instanceInfo.getLastReportTime()) { + log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will br dropped.", instanceId, req); + return; } - // 更新本地保存的任务实例状态(用于未完成任务前的详细信息查询和缓存加速) - InstanceStatusHolder statusHolder = instanceId2StatusHolder.computeIfAbsent(instanceId, ignore -> new InstanceStatusHolder()); - if (req.getReportTime() > statusHolder.getLastReportTime()) { - BeanUtils.copyProperties(req, statusHolder); - statusHolder.setLastReportTime(req.getReportTime()); - }else { - log.warn("[InstanceManager] receive the expired status report request: {}.", req); + // 丢弃非目标 TaskTracker 的上报数据(脑裂情况) + if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) { + log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress()); return; } InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus()); - Integer timeExpressionType = instanceId2JobInfo.get(instanceId).getTimeExpressionType(); + Integer timeExpressionType = jobInfo.getTimeExpressionType(); + + instanceInfo.setStatus(newStatus.getV()); + instanceInfo.setLastReportTime(req.getReportTime()); + instanceInfo.setGmtModified(new Date()); // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可 // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) // 综上,直接把 status 和 runningNum 同步到DB即可 if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) { - getInstanceInfoRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum(), new Date()); + + instanceInfo.setRunningTimes(req.getTotalTaskNum()); + instanceInfoRepository.saveAndFlush(instanceInfo); return; } - InstanceInfoDO updateEntity = getInstanceInfoRepository().findByInstanceId(instanceId); - updateEntity.setStatus(newStatus.getV()); - updateEntity.setGmtModified(new Date()); - boolean finished = false; if (newStatus == InstanceStatus.SUCCEED) { - updateEntity.setResult(req.getResult()); - updateEntity.setFinishedTime(System.currentTimeMillis()); + instanceInfo.setResult(req.getResult()); + instanceInfo.setFinishedTime(System.currentTimeMillis()); finished = true; }else if (newStatus == InstanceStatus.FAILED) { // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =) - if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { + if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) { - log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes()); + log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes()); // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败) HashedWheelTimerHolder.TIMER.schedule(() -> { - getDispatchService().redispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); + dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); }, 10, TimeUnit.SECONDS); // 修改状态为 等待派发,正式开始重试 // 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖) - updateEntity.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); }else { - updateEntity.setResult(req.getResult()); - updateEntity.setFinishedTime(System.currentTimeMillis()); + instanceInfo.setResult(req.getResult()); + instanceInfo.setFinishedTime(System.currentTimeMillis()); finished = true; - log.info("[InstanceManager] instance(instanceId={}) execute failed and have no chance to retry.", instanceId); + log.info("[InstanceManager-{}] instance execute failed and have no chance to retry.", instanceId); } } // 同步状态变更信息到数据库 - getInstanceInfoRepository().saveAndFlush(updateEntity); + instanceInfoRepository.saveAndFlush(instanceInfo); if (finished) { // 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报 @@ -155,53 +155,53 @@ public class InstanceManager { * @param status 任务状态,有 成功/失败/手动停止 * @param result 执行结果 */ - public static void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) { + public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) { log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); - // 清除已完成的实例信息 - instanceId2StatusHolder.remove(instanceId); - // 这一步也可能导致后面取不到 JobInfoDO - JobInfoDO jobInfo = instanceId2JobInfo.remove(instanceId); - // 上报日志数据 - getInstanceLogService().sync(instanceId); + instanceLogService.sync(instanceId); // workflow 特殊处理 if (wfInstanceId != null) { // 手动停止在工作流中也认为是失败(理论上不应该发生) - getWorkflowInstanceManager().move(wfInstanceId, instanceId, status, result); + workflowInstanceManager.move(wfInstanceId, instanceId, status, result); } // 告警 if (status == InstanceStatus.FAILED) { - - if (jobInfo == null) { - jobInfo = fetchJobInfo(instanceId); - } + JobInfoDO jobInfo = fetchJobInfo(instanceId); if (jobInfo == null) { log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId); return; } - InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); JobInstanceAlarmContent content = new JobInstanceAlarmContent(); BeanUtils.copyProperties(jobInfo, content); BeanUtils.copyProperties(instanceInfo, content); List userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); - getAlarmService().onJobInstanceFailed(content, userList); + omsCenterAlarmService.onJobInstanceFailed(content, userList); } + + // 过期缓存 + instanceId2JobInfo.invalidate(instanceId); } - public static JobInfoDO fetchJobInfo(Long instanceId) { - JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId); + /** + * 根据任务实例ID查询任务相关信息 + * @param instanceId 任务实例ID + * @return 任务元数据 + */ + public JobInfoDO fetchJobInfo(Long instanceId) { + JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId); if (jobInfo != null) { return jobInfo; } - InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); if (instanceInfo != null) { - return getJobInfoRepository().findById(instanceInfo.getJobId()).orElse(null); + return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null); } return null; } @@ -209,74 +209,7 @@ public class InstanceManager { /** * 释放本地缓存,防止内存泄漏 */ - public static void releaseInstanceInfos() { - instanceId2JobInfo = Maps.newConcurrentMap(); - instanceId2StatusHolder = Maps.newConcurrentMap(); - } - - private static InstanceInfoRepository getInstanceInfoRepository() { - while (instanceInfoRepository == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class); - } - return instanceInfoRepository; - } - - private static JobInfoRepository getJobInfoRepository() { - while (jobInfoRepository == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); - } - return jobInfoRepository; - } - - private static DispatchService getDispatchService() { - while (dispatchService == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - dispatchService = SpringUtils.getBean(DispatchService.class); - } - return dispatchService; - } - - private static InstanceLogService getInstanceLogService() { - while (instanceLogService == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - instanceLogService = SpringUtils.getBean(InstanceLogService.class); - } - return instanceLogService; - } - - private static Alarmable getAlarmService() { - while (omsCenterAlarmService == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - omsCenterAlarmService = (Alarmable) SpringUtils.getBean("omsCenterAlarmService"); - } - return omsCenterAlarmService; - } - - private static WorkflowInstanceManager getWorkflowInstanceManager() { - while (workflowInstanceManager == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - workflowInstanceManager = SpringUtils.getBean(WorkflowInstanceManager.class); - } - return workflowInstanceManager; + public static void releaseCache() { + instanceId2JobInfo.cleanUp(); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index 3765c81c..526e4ac6 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -41,6 +41,8 @@ public class InstanceService { @Resource private IdGenerateService idGenerateService; @Resource + private InstanceManager instanceManager; + @Resource private InstanceInfoRepository instanceInfoRepository; /** @@ -67,6 +69,7 @@ public class InstanceService { newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); newInstanceInfo.setExpectedTriggerTime(expectTriggerTime); + newInstanceInfo.setLastReportTime(-1L); newInstanceInfo.setGmtCreate(now); newInstanceInfo.setGmtModified(now); @@ -101,7 +104,7 @@ public class InstanceService { instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfoRepository.saveAndFlush(instanceInfo); - InstanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER); + instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER); /* 不可靠通知停止 TaskTracker diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceStatusHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceStatusHolder.java deleted file mode 100644 index 75bb0803..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceStatusHolder.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.kfcfans.powerjob.server.service.instance; - -import lombok.Data; - -/** - * 保存任务实例的状态信息 - * - * @author tjq - * @since 2020/4/7 - */ -@Data -public class InstanceStatusHolder { - - private long instanceId; - private int instanceStatus; - private String result; - - /* ********* 统计信息 ********* */ - private long totalTaskNum; - private long succeedTaskNum; - private long failedTaskNum; - - // 任务开始时间 - private long startTime; - // 上次上报时间 - private long lastReportTime; - // 源地址(TaskTracker 地址) - private String sourceAddress; -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 6d1993eb..e3b9fd88 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -60,7 +60,7 @@ public class CleanService { // 释放本地缓存 WorkerManagerService.releaseContainerInfos(); - InstanceManager.releaseInstanceInfos(); + InstanceManager.releaseCache(); // 删除数据库运行记录 cleanInstanceLog(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index 36426247..ad677415 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -44,6 +44,8 @@ public class InstanceStatusCheckService { @Resource private DispatchService dispatchService; @Resource + private InstanceManager instanceManager; + @Resource private WorkflowInstanceManager workflowInstanceManager; @Resource @@ -190,6 +192,6 @@ public class InstanceStatusCheckService { instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instanceInfoRepository.saveAndFlush(instance); - InstanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!"); + instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!"); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index d5de14b7..0ebcd68a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.scheduling.annotation.Async; @@ -169,10 +170,8 @@ public class OmsScheduleService { jobInfos.forEach(jobInfoDO -> { try { - CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); - Date benchmarkTime = new Date(jobInfoDO.getNextTriggerTime()); - Date nextTriggerTime = cronExpression.getNextValidTimeAfter(benchmarkTime); + Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression()); JobInfoDO updatedJobInfo = new JobInfoDO(); BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); @@ -221,8 +220,7 @@ public class OmsScheduleService { // 3. 重新计算下一次调度时间并更新 try { - CronExpression cronExpression = new CronExpression(wfInfo.getTimeExpression()); - Date nextTriggerTime = cronExpression.getNextValidTimeAfter(new Date(wfInfo.getNextTriggerTime())); + Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); WorkflowInfoDO updateEntity = new WorkflowInfoDO(); BeanUtils.copyProperties(wfInfo, updateEntity); @@ -267,4 +265,18 @@ public class OmsScheduleService { }); } + /** + * 计算下次触发时间 + * @param preTriggerTime 前一次触发时间 + * @param cronExpression CRON 表达式 + * @return 下一次调度时间 + * @throws Exception 异常 + */ + private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws Exception { + + CronExpression ce = new CronExpression(cronExpression); + // 取最大值,防止长时间未调度任务被连续调度(原来DISABLE的任务突然被打开,不取最大值会补上过去所有的调度) + long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime); + return ce.getNextValidTimeAfter(new Date(benchmarkTime)); + } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java index 8d4a1796..3df267c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java @@ -7,6 +7,9 @@ import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; + +import java.util.List; /** * 普通计算节点,处理来自 TaskTracker 的请求 @@ -28,13 +31,14 @@ public class ProcessorTrackerActor extends AbstractActor { /** * 处理来自TaskTracker的task执行请求 + * @param req 请求 */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { Long instanceId = req.getInstanceInfo().getInstanceId(); - // 创建 ProcessorTracker 一定能成功,且每个任务实例只会创建一个 ProcessorTracker - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req)); + // 创建 ProcessorTracker 一定能成功 + ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req)); TaskDO task = new TaskDO(); @@ -47,14 +51,18 @@ public class ProcessorTrackerActor extends AbstractActor { processorTracker.submitTask(task); } + /** + * 处理来自TaskTracker停止任务的请求 + * @param req 请求 + */ private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { Long instanceId = req.getInstanceId(); - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId); - if (processorTracker == null) { + List removedPts = ProcessorTrackerPool.removeProcessorTracker(instanceId); + if (CollectionUtils.isEmpty(removedPts)) { log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId); }else { - processorTracker.destroy(); + removedPts.forEach(ProcessorTracker::destroy); } } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index ec3d296d..a1f2f647 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; +import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; @@ -14,6 +15,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; +import javafx.concurrent.Task; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -47,12 +49,20 @@ public class TaskTrackerActor extends AbstractActor { */ private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { + int taskStatus = req.getStatus(); TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); + + // 结束状态需要回复接受成功 + if (TaskStatus.finishedStatus.contains(taskStatus)) { + AskResponse askResponse = AskResponse.succeed(null); + getSender().tell(askResponse, getSelf()); + } + // 手动停止 TaskTracker 的情况下会出现这种情况 if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); } else { - taskTracker.updateTaskStatus(req.getTaskId(), req.getStatus(), req.getReportTime(), req.getResult()); + taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java index 2dada51d..1ec13f93 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java @@ -1,8 +1,11 @@ package com.github.kfcfans.powerjob.worker.common.constants; +import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.List; + /** * 任务状态,task_info 表中 status 字段的枚举值 * @@ -20,6 +23,8 @@ public enum TaskStatus { WORKER_PROCESS_FAILED(5, "worker执行失败"), WORKER_PROCESS_SUCCESS(6, "worker执行成功"); + public static final List finishedStatus = Lists.newArrayList(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value); + private int value; private String des; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java index a0c168fb..42114b90 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java @@ -1,15 +1,24 @@ package com.github.kfcfans.powerjob.worker.common.utils; +import akka.actor.ActorSelection; +import akka.pattern.Patterns; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.common.RemoteConstant; +import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + /** * AKKA 工具类 * * @author tjq * @since 2020/3/17 */ +@Slf4j public class AkkaUtils { /** @@ -28,4 +37,21 @@ public class AkkaUtils { return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); } + /** + * 可靠传输 + * @param remote 远程 AKKA 节点 + * @param msg 需要传输的对象 + * @return true: 对方接收成功 / false: 对方接收失败(可能传输成功但对方处理失败,需要协同处理 AskResponse 返回值) + */ + public static boolean reliableTransmit(ActorSelection remote, Object msg) { + try { + CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + return response.isSuccess(); + }catch (Exception e) { + log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString()); + } + return false; + } + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index 78b7c84b..1c6a9090 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -1,11 +1,15 @@ package com.github.kfcfans.powerjob.worker.core.executor; import akka.actor.ActorSelection; +import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.RemoteConstant; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; +import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.log.OmsLogger; @@ -25,7 +29,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; +import java.time.Duration; import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; /** * Processor 执行器 @@ -45,6 +53,8 @@ public class ProcessorRunnable implements Runnable { private final OmsLogger omsLogger; // 类加载器 private final ClassLoader classLoader; + // 重试队列,ProcessorTracker 将会定期重新上报处理结果 + private final Queue statusReportRetryQueue; public void innerRun() throws InterruptedException { @@ -174,7 +184,17 @@ public class ProcessorRunnable implements Runnable { req.setResult(result); req.setReportTime(System.currentTimeMillis()); - taskTrackerActor.tell(req, null); + // 最终结束状态要求可靠发送 + if (TaskStatus.finishedStatus.contains(status.getValue())) { + boolean success = AkkaUtils.reliableTransmit(taskTrackerActor, req); + if (!success) { + // 插入重试队列,等待重试 + statusReportRetryQueue.add(req); + log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed.", task.getInstanceId(), task.getTaskId(), status, result); + } + }else { + taskTrackerActor.tell(req, null); + } } @Override @@ -185,7 +205,8 @@ public class ProcessorRunnable implements Runnable { innerRun(); }catch (InterruptedException ignore) { }catch (Throwable e) { - log.error("[ProcessorRunnable-{}] execute failed, please fix this bug @tjq!", task.getInstanceId(), e); + reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString()); + log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e); }finally { ThreadLocalStore.clear(); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java index a7a84f69..5f98e816 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -1,9 +1,6 @@ package com.github.kfcfans.powerjob.worker.core.processor.sdk; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.RemoteConstant; -import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; @@ -14,10 +11,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import java.time.Duration; import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; /** * Map 处理器,允许开发者自定义拆分任务进行分布式执行 @@ -53,20 +47,13 @@ public abstract class MapProcessor implements BasicProcessor { ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) - boolean requestSucceed = false; - try { - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); - ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); - CompletionStage requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); - AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); - requestSucceed = respObj.isSuccess(); - }catch (Exception e) { - log.warn("[MapProcessor] map failed, exception is {}.", e.toString()); - } + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); + boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req); if (requestSucceed) { return new ProcessResult(true, "MAP_SUCCESS"); }else { + log.warn("[MapProcessor] map failed for {}", taskName); return new ProcessResult(false, "MAP_FAILED"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index d8721b60..4d0cd01b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -21,11 +21,13 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import java.util.List; +import java.util.Queue; import java.util.concurrent.*; /** @@ -50,6 +52,8 @@ public class ProcessorTracker { private OmsContainer omsContainer; // 在线日志 private OmsLogger omsLogger; + // ProcessResult 上报失败的重试队列 + private Queue statusReportRetryQueue; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -77,6 +81,7 @@ public class ProcessorTracker { this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); this.omsLogger = new OmsServerLogger(instanceId); + this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); @@ -119,7 +124,7 @@ public class ProcessorTracker { newTask.setAddress(taskTrackerAddress); ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader(); - ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader); + ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue); try { threadPool.submit(processorRunnable); success = true; @@ -165,6 +170,7 @@ public class ProcessorTracker { // 2. 去除顶层引用,送入GC世界 taskTrackerActorRef = null; + statusReportRetryQueue.clear(); ProcessorTrackerPool.removeProcessorTracker(instanceId); log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId); @@ -224,6 +230,19 @@ public class ProcessorTracker { } } + // 上报状态之前,先重新发送失败的任务,只要有结果堆积,就不上报状态(让 PT 认为该 TT 失联然后重试相关任务) + while (!statusReportRetryQueue.isEmpty()) { + ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll(); + if (req != null) { + req.setReportTime(System.currentTimeMillis()); + if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) { + statusReportRetryQueue.add(req); + return; + } + } + } + + // 上报当前 ProcessorTracker 负载 long waitingNum = threadPool.getQueue().size(); ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum); taskTrackerActorRef.tell(req, null); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java index 9c2bdc0d..e6ffe539 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java @@ -1,9 +1,13 @@ package com.github.kfcfans.powerjob.worker.core.tracker.processor; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; /** * 持有 Processor 对象 @@ -14,23 +18,36 @@ import java.util.function.Function; */ public class ProcessorTrackerPool { - private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap(); + // instanceId -> (TaskTrackerAddress -> ProcessorTracker) + // 处理脑裂情况下同一个 Instance 存在多个 TaskTracker 的情况 + private static final Map> processorTrackerPool = Maps.newHashMap(); /** * 获取 ProcessorTracker,如果不存在则创建 */ - public static ProcessorTracker getProcessorTracker(Long instanceId, Function creator) { - return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator); + public static ProcessorTracker getProcessorTracker(Long instanceId, String address, Supplier creator) { + + ProcessorTracker processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address); + if (processorTracker == null) { + synchronized (ProcessorTrackerPool.class) { + processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address); + if (processorTracker == null) { + processorTracker = creator.get(); + processorTrackerPool.computeIfAbsent(instanceId, ignore -> Maps.newHashMap()).put(address, processorTracker); + } + } + } + return processorTracker; } - /** - * 获取 ProcessorTracker - */ - public static ProcessorTracker getProcessorTracker(Long instanceId) { - return instanceId2ProcessorTracker.get(instanceId); - } + public static List removeProcessorTracker(Long instanceId) { - public static void removeProcessorTracker(Long instanceId) { - instanceId2ProcessorTracker.remove(instanceId); + List res = Lists.newLinkedList(); + Map ttAddress2Pt = processorTrackerPool.remove(instanceId); + if (ttAddress2Pt != null) { + res.addAll(ttAddress2Pt.values()); + ttAddress2Pt.clear(); + } + return res; } }