diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 38d4937f..b481e0c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -63,7 +63,7 @@ public class ProcessorTracker { private ThreadPoolExecutor threadPool; private ScheduledExecutorService timingPool; - private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100; + private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128; // 长时间空闲的 ProcessorTracker 会发起销毁请求 private static final long MAX_IDLE_TIME = 300000; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 5d43a969..27d3af71 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -105,14 +105,6 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); } - @Override - public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { - super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result); - // 更新 LastActiveTime - SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId); - timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime); - } - @Override public InstanceDetail fetchRunningStatus() { InstanceDetail detail = new InstanceDetail(); @@ -181,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker { if (maxInstanceNum > 0) { if (timeExpressionType == TimeExpressionType.FIX_RATE) { if (subInstanceId2TimeHolder.size() > maxInstanceNum) { - log.warn("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); + log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); return; } @@ -190,14 +182,14 @@ public class FrequentTaskTracker extends TaskTracker { // 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) if (!taskPersistenceService.save(newRootTask)) { - log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId); + log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId); processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); return; } // 生成记录信息(必须保证持久化成功才能生成该记录,否则会导致 LAUNCH_FAILED 错误) SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); - timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); + timeHolder.startTime = System.currentTimeMillis(); subInstanceId2TimeHolder.put(subInstanceId, timeHolder); dispatchTask(newRootTask, myAddress); @@ -208,7 +200,7 @@ public class FrequentTaskTracker extends TaskTracker { try { innerRun(); }catch (Exception e) { - log.error("[TaskTracker-{}] launch task failed.", instanceId, e); + log.error("[FQTaskTracker-{}] launch task failed.", instanceId, e); } } } @@ -218,8 +210,6 @@ public class FrequentTaskTracker extends TaskTracker { */ private class Checker implements Runnable { - private static final long HEARTBEAT_TIMEOUT_MS = 60000; - @Override public void run() { @@ -231,7 +221,7 @@ public class FrequentTaskTracker extends TaskTracker { checkStatus(); reportStatus(); }catch (Exception e) { - log.warn("[TaskTracker-{}] check and report status failed.", instanceId, e); + log.warn("[FQTaskTracker-{}] check and report status failed.", instanceId, e); } } @@ -249,7 +239,6 @@ public class FrequentTaskTracker extends TaskTracker { SubInstanceTimeHolder timeHolder = entry.getValue(); long executeTimeout = nowTS - timeHolder.startTime; - long heartbeatTimeout = nowTS - timeHolder.lastActiveTime; // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 if (executeTimeout > instanceTimeoutMS) { @@ -257,11 +246,6 @@ public class FrequentTaskTracker extends TaskTracker { continue; } - if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { - onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator); - continue; - } - // 查看执行情况 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); @@ -312,7 +296,7 @@ public class FrequentTaskTracker extends TaskTracker { } // 舍去一切重试机制,反正超时就失败 } - log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch); + log.debug("[FQTaskTracker-{}] check status using {}.", instanceId, stopwatch); } private void reportStatus() { @@ -387,7 +371,6 @@ public class FrequentTaskTracker extends TaskTracker { private static class SubInstanceTimeHolder { private long startTime; - private long lastActiveTime; } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 1974e0c7..bb9bb88f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -166,7 +166,7 @@ public abstract class TaskTracker { lastReportTime = taskOpt.get().getLastReportTime(); }else { // 理论上不存在这种情况,除非数据库异常 - log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", instanceId, instanceId, taskId); + log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId); } if (lastReportTime == null) { @@ -176,8 +176,8 @@ public abstract class TaskTracker { // 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时,时间不一致可能导致问题) if (lastReportTime > reportTime) { - log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", - instanceId, lastReportTime, reportTime, taskId, newStatus); + log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", + instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus); return; } @@ -215,7 +215,7 @@ public abstract class TaskTracker { boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); if (retryTask) { - log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId); + log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId); return; } } @@ -227,12 +227,12 @@ public abstract class TaskTracker { boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); if (!updateResult) { - log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); + log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId); } } catch (InterruptedException ignore) { } catch (Exception e) { - log.warn("[TaskTracker-{}] update task status failed.", instanceId, e); + log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e); } finally { segmentLock.unlock(lockId); }