From 3ecefd22cb4b0fb7e331b6da743f19172832dc32 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 11:31:48 +0800 Subject: [PATCH] fix: the bug of idle check #146 --- .../server/service/workflow/WorkflowInstanceManager.java | 2 +- .../worker/core/tracker/processor/ProcessorTracker.java | 8 ++++++-- .../powerjob/worker/core/tracker/task/TaskTracker.java | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index a10b8a92..adf8cc4c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -314,7 +314,7 @@ public class WorkflowInstanceManager { } /** - * 允许任务实例 + * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 * @param jobId 任务ID * @param instanceId 任务实例ID diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index e12f48cc..15876cb6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -55,8 +55,10 @@ public class ProcessorTracker { private OmsLogger omsLogger; // ProcessResult 上报失败的重试队列 private Queue statusReportRetryQueue; - // 上一次空闲时间 + // 上一次空闲时间(用于闲置判定) private long lastIdleTime; + // 上次完成任务数量(用于闲置判定) + private long lastCompletedTaskCount; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -88,6 +90,7 @@ public class ProcessorTracker { this.omsLogger = new OmsServerLogger(instanceId); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; + this.lastCompletedTaskCount = 0L; // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); @@ -239,8 +242,9 @@ public class ProcessorTracker { } // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查 - if (threadPool.getActiveCount() > 0) { + if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) { lastIdleTime = -1; + lastCompletedTaskCount = threadPool.getCompletedTaskCount(); }else { if (lastIdleTime == -1) { lastIdleTime = System.currentTimeMillis(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 55c38634..15827976 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -446,7 +446,7 @@ public abstract class TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; - long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2; + long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务