diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index ae74a553..038ff954 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-worker - 5.1.0-bugfix + 5.1.0-bugfix2-SNAPSHOT jar diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 443cbfd1..797a8475 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -478,6 +479,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { long currentDispatchNum = 0; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); + AtomicBoolean skipThisRound = new AtomicBoolean(false); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { @@ -490,8 +492,15 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { - if (taskNeedByPassTaskTracker(availablePtIps)) { + if (taskNeedByPassTaskTracker()) { + int loopTime = 0; do { + loopTime++; + if (loopTime > 3) { + log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId); + skipThisRound.set(true); + return; + } ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } while (workerRuntime.getWorkerAddress().equals(ptAddress)); } else { @@ -501,6 +510,10 @@ public abstract class HeavyTaskTracker extends TaskTracker { dispatchTask(task, ptAddress); }); + if (skipThisRound.get()) { + break; + } + // 数量不足 或 查询失败,则终止循环 if (needDispatchTasks.size() < dbQueryLimit) { break; @@ -510,18 +523,8 @@ public abstract class HeavyTaskTracker extends TaskTracker { log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } - /** - * padding的生效条件: 在map || mapReduce 的情况下, 且是该appId的worker是 非单机运行时,才生效。 - * fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中, - * 且该线程不能通过停止任务的方式去停止,只能通过重启该work实例的方式释放该线程。 - */ - private boolean taskNeedByPassTaskTracker(List availablePtIps) { + private boolean taskNeedByPassTaskTracker() { if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) { - - if (availablePtIps.size() <= 1) { - return false; - } - return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()); } return false;