[dev] idle ProcessorTracker auto destroy

This commit is contained in:
tjq 2020-06-21 18:57:45 +08:00
parent 10824c5c36
commit 51d7ba4743
7 changed files with 34 additions and 27 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -127,6 +127,7 @@ public class TaskTrackerActor extends AbstractActor {
* ProcessorTracker 心跳处理器 * ProcessorTracker 心跳处理器
*/ */
private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) { private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) {
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) { if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", req); log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", req);

View File

@ -48,8 +48,10 @@ public class ServerDiscoveryService {
String ip = currentServer.split(":")[0]; String ip = currentServer.split(":")[0];
// 直接请求当前Server的HTTP服务可以少一次网络开销减轻Server负担 // 直接请求当前Server的HTTP服务可以少一次网络开销减轻Server负担
String firstServerAddress = IP2ADDRESS.get(ip); String firstServerAddress = IP2ADDRESS.get(ip);
if (firstServerAddress != null) {
result = acquire(firstServerAddress); result = acquire(firstServerAddress);
} }
}
for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) { for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {
if (StringUtils.isEmpty(result)) { if (StringUtils.isEmpty(result)) {

View File

@ -64,7 +64,7 @@ public class ProcessorTracker {
private ScheduledExecutorService timingPool; private ScheduledExecutorService timingPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100; private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
// 最多长时间空闲的 ProcessorTracker 会发起销毁请求 // 长时间空闲的 ProcessorTracker 会发起销毁请求
private static final long MAX_IDLE_TIME = 120000; private static final long MAX_IDLE_TIME = 120000;
// ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败 // ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败
@ -237,18 +237,6 @@ public class ProcessorTracker {
} }
} }
// 上报状态之前先重新发送失败的任务只要有结果堆积就不上报状态 PT 认为该 TT 失联然后重试相关任务
while (!statusReportRetryQueue.isEmpty()) {
ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
if (req != null) {
req.setReportTime(System.currentTimeMillis());
if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) {
statusReportRetryQueue.add(req);
return;
}
}
}
// 判断线程池活跃状态长时间空闲则上报 TaskTracker 请求检查 // 判断线程池活跃状态长时间空闲则上报 TaskTracker 请求检查
if (threadPool.getActiveCount() > 0) { if (threadPool.getActiveCount() > 0) {
lastIdleTime = -1; lastIdleTime = -1;
@ -258,10 +246,23 @@ public class ProcessorTracker {
}else { }else {
long idleTime = System.currentTimeMillis() - lastIdleTime; long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) { if (idleTime > MAX_IDLE_TIME) {
lastIdleTime = System.currentTimeMillis(); log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker.", instanceId, idleTime);
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null); taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null);
destroy();
return;
}
}
}
// 上报状态之前先重新发送失败的任务只要有结果堆积就不上报状态 PT 认为该 TT 失联然后重试相关任务
while (!statusReportRetryQueue.isEmpty()) {
ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
if (req != null) {
req.setReportTime(System.currentTimeMillis());
if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) {
statusReportRetryQueue.add(req);
return; return;
} }
} }

View File

@ -272,9 +272,12 @@ public abstract class TaskTracker {
// 上报空闲检查是否已经接收到全部该 ProcessorTracker 负责的任务 // 上报空闲检查是否已经接收到全部该 ProcessorTracker 负责的任务
if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) { if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, heartbeatReq.getAddress()); String idlePtAddress = heartbeatReq.getAddress();
// ProcessorTracker 已销毁重置为初始状态
ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) { if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker is idle now but have unfinished tasks: {}", instanceId, unfinishedTask); log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
} }
} }