diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index a10b8a92..adf8cc4c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -314,7 +314,7 @@ public class WorkflowInstanceManager { } /** - * 允许任务实例 + * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 * @param jobId 任务ID * @param instanceId 任务实例ID diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index e12f48cc..15876cb6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -55,8 +55,10 @@ public class ProcessorTracker { private OmsLogger omsLogger; // ProcessResult 上报失败的重试队列 private Queue statusReportRetryQueue; - // 上一次空闲时间 + // 上一次空闲时间(用于闲置判定) private long lastIdleTime; + // 上次完成任务数量(用于闲置判定) + private long lastCompletedTaskCount; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -88,6 +90,7 @@ public class ProcessorTracker { this.omsLogger = new OmsServerLogger(instanceId); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; + this.lastCompletedTaskCount = 0L; // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); @@ -239,8 +242,9 @@ public class ProcessorTracker { } // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查 - if (threadPool.getActiveCount() > 0) { + if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) { lastIdleTime = -1; + lastCompletedTaskCount = threadPool.getCompletedTaskCount(); }else { if (lastIdleTime == -1) { lastIdleTime = System.currentTimeMillis(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 55c38634..15827976 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -446,7 +446,7 @@ public abstract class TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; - long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2; + long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务