From 4507a6a883e5eb92c3492e20cf97838d1a548451 Mon Sep 17 00:00:00 2001 From: shenkang Date: Thu, 30 May 2024 11:11:46 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=BD=93=E8=AF=A5appId=E7=9A=84worker?= =?UTF-8?q?=E6=98=AF=E5=8D=95=E6=9C=BA=E8=BF=90=E8=A1=8C=20=E4=B8=94=20pad?= =?UTF-8?q?ding=E6=97=B6=EF=BC=8C=20=E5=AF=BC=E8=87=B4Dispatcher=E5=88=86?= =?UTF-8?q?=E5=8F=91=E4=BB=BB=E5=8A=A1=E5=A4=84=E4=BA=8E=E6=AD=BB=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E4=B8=AD=EF=BC=8C=20=E8=87=B4=E4=BD=BF=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E5=88=86=E5=8F=91=E4=BB=BB=E5=8A=A1=EF=BC=8C=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E4=B8=80=E7=9B=B4=E4=B8=BA=E8=BF=90=E8=A1=8C=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/tracker/task/heavy/HeavyTaskTracker.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index e2d383cd..8de2323c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -456,6 +456,8 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; + private static final int STANDALONE_SIZE = 1; + @Override public void run0() { @@ -490,7 +492,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { - if (taskNeedByPassTaskTracker()) { + if (taskNeedByPassTaskTracker(availablePtIps)) { do { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } while (workerRuntime.getWorkerAddress().equals(ptAddress)); @@ -510,9 +512,15 @@ public abstract class HeavyTaskTracker extends TaskTracker { log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } - private boolean taskNeedByPassTaskTracker() { + /** + * padding的生效条件: 在map || mapReduce 的情况下, 且是该appId的worker是 非单机运行时,才生效。 + * fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中, + * 且该线程不能通过停止任务的方式去停止,只能通过重启该work实例的方式释放该线程。 + */ + private boolean taskNeedByPassTaskTracker(List availablePtIps) { if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) { - return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()); + return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()) && + availablePtIps.size() != STANDALONE_SIZE; } return false; }