From a67adf96017de69810d5e82f78ef3de9d97567e7 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Oct 2020 16:54:47 +0800 Subject: [PATCH] fix: never update lastActiveTime which lead to TIMEOUT for frequentJob(thanks @Y) --- .../processors/BroadcastProcessorDemo.java | 1 + .../worker/actors/TaskTrackerActor.java | 2 +- .../core/tracker/task/FrequentTaskTracker.java | 17 +++++++++++++++-- .../worker/core/tracker/task/TaskTracker.java | 5 +++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java index 9d7e898f..73bb3ec7 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java @@ -36,6 +36,7 @@ public class BroadcastProcessorDemo extends BroadcastProcessor { public ProcessResult process(TaskContext taskContext) throws Exception { System.out.println("===== BroadcastProcessorDemo#process ======"); taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); + Thread.sleep(45 * 1000); return new ProcessResult(true); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index cbb69d8d..4b53ae84 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -65,7 +65,7 @@ public class TaskTrackerActor extends AbstractActor { taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); } - taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); + taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index ffc448fa..b575ec4b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -104,6 +105,14 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); } + @Override + public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { + super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result); + // 更新 LastActiveTime + SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId); + timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime); + } + @Override public InstanceDetail fetchRunningStatus() { InstanceDetail detail = new InstanceDetail(); @@ -243,9 +252,13 @@ public class FrequentTaskTracker extends TaskTracker { long heartbeatTimeout = nowTS - timeHolder.lastActiveTime; // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 - if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { + if (executeTimeout > instanceTimeoutMS) { + onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator); + continue; + } - onFinished(subInstanceId, false, "TIMEOUT", iterator); + if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { + onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator); continue; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index a34401c4..1974e0c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -138,12 +138,13 @@ public abstract class TaskTracker { /** * 更新Task状态 * V1.0.0 -> V1.0.1(e405e283ad7f97b0b4e5d369c7de884c0caf9192) 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 + * @param subInstanceId 子任务实例ID * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间 * @param result task的执行结果,未执行完成时为空 */ - public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { + public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { if (finished.get()) { return; @@ -278,7 +279,7 @@ public abstract class TaskTracker { List unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); if (!CollectionUtils.isEmpty(unfinishedTask)) { log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask); - unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); + unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); } } }