fix: remove FrequentTaskTracker's heart beat,just using instanceTimeoutMS to avoid instance never stopped

This commit is contained in:
tjq 2020-10-08 21:05:41 +08:00
parent 6d8fe8e7ff
commit ab0757a87e
3 changed files with 13 additions and 30 deletions

View File

@ -63,7 +63,7 @@ public class ProcessorTracker {
private ThreadPoolExecutor threadPool;
private ScheduledExecutorService timingPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
// 长时间空闲的 ProcessorTracker 会发起销毁请求
private static final long MAX_IDLE_TIME = 300000;

View File

@ -105,14 +105,6 @@ public class FrequentTaskTracker extends TaskTracker {
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
}
@Override
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result);
// 更新 LastActiveTime
SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId);
timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime);
}
@Override
public InstanceDetail fetchRunningStatus() {
InstanceDetail detail = new InstanceDetail();
@ -181,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (maxInstanceNum > 0) {
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
if (subInstanceId2TimeHolder.size() > maxInstanceNum) {
log.warn("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE");
return;
}
@ -190,14 +182,14 @@ public class FrequentTaskTracker extends TaskTracker {
// 必须先持久化持久化成功才能 dispatch否则会导致后续报错因为DB中没有这个taskId对应的记录会各种报错
if (!taskPersistenceService.save(newRootTask)) {
log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId);
log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId);
processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED");
return;
}
// 生成记录信息必须保证持久化成功才能生成该记录否则会导致 LAUNCH_FAILED 错误
SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder();
timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis();
timeHolder.startTime = System.currentTimeMillis();
subInstanceId2TimeHolder.put(subInstanceId, timeHolder);
dispatchTask(newRootTask, myAddress);
@ -208,7 +200,7 @@ public class FrequentTaskTracker extends TaskTracker {
try {
innerRun();
}catch (Exception e) {
log.error("[TaskTracker-{}] launch task failed.", instanceId, e);
log.error("[FQTaskTracker-{}] launch task failed.", instanceId, e);
}
}
}
@ -218,8 +210,6 @@ public class FrequentTaskTracker extends TaskTracker {
*/
private class Checker implements Runnable {
private static final long HEARTBEAT_TIMEOUT_MS = 60000;
@Override
public void run() {
@ -231,7 +221,7 @@ public class FrequentTaskTracker extends TaskTracker {
checkStatus();
reportStatus();
}catch (Exception e) {
log.warn("[TaskTracker-{}] check and report status failed.", instanceId, e);
log.warn("[FQTaskTracker-{}] check and report status failed.", instanceId, e);
}
}
@ -249,7 +239,6 @@ public class FrequentTaskTracker extends TaskTracker {
SubInstanceTimeHolder timeHolder = entry.getValue();
long executeTimeout = nowTS - timeHolder.startTime;
long heartbeatTimeout = nowTS - timeHolder.lastActiveTime;
// 超时包含总运行时间超时和心跳包超时直接判定为失败
if (executeTimeout > instanceTimeoutMS) {
@ -257,11 +246,6 @@ public class FrequentTaskTracker extends TaskTracker {
continue;
}
if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator);
continue;
}
// 查看执行情况
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId);
@ -312,7 +296,7 @@ public class FrequentTaskTracker extends TaskTracker {
}
// 舍去一切重试机制反正超时就失败
}
log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch);
log.debug("[FQTaskTracker-{}] check status using {}.", instanceId, stopwatch);
}
private void reportStatus() {
@ -387,7 +371,6 @@ public class FrequentTaskTracker extends TaskTracker {
private static class SubInstanceTimeHolder {
private long startTime;
private long lastActiveTime;
}
}

View File

@ -166,7 +166,7 @@ public abstract class TaskTracker {
lastReportTime = taskOpt.get().getLastReportTime();
}else {
// 理论上不存在这种情况除非数据库异常
log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", instanceId, instanceId, taskId);
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
}
if (lastReportTime == null) {
@ -176,8 +176,8 @@ public abstract class TaskTracker {
// 过滤过期的请求潜在的集群时间一致性需求重试跨Worker时时间不一致可能导致问题
if (lastReportTime > reportTime) {
log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, lastReportTime, reportTime, taskId, newStatus);
log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus);
return;
}
@ -215,7 +215,7 @@ public abstract class TaskTracker {
boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (retryTask) {
log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId);
log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
return;
}
}
@ -227,12 +227,12 @@ public abstract class TaskTracker {
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
}
} catch (InterruptedException ignore) {
} catch (Exception e) {
log.warn("[TaskTracker-{}] update task status failed.", instanceId, e);
log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
} finally {
segmentLock.unlock(lockId);
}