mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
fix: PADDLING not work
This commit is contained in:
parent
0bb069fa5b
commit
f9dd8d7713
@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>powerjob-worker</artifactId>
|
<artifactId>powerjob-worker</artifactId>
|
||||||
<version>5.1.0-bugfix</version>
|
<version>5.1.0-bugfix2-SNAPSHOT</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
@ -41,6 +41,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -478,6 +479,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
|||||||
long currentDispatchNum = 0;
|
long currentDispatchNum = 0;
|
||||||
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
|
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
|
||||||
AtomicInteger index = new AtomicInteger(0);
|
AtomicInteger index = new AtomicInteger(0);
|
||||||
|
AtomicBoolean skipThisRound = new AtomicBoolean(false);
|
||||||
|
|
||||||
// 4. 循环查询数据库,获取需要派发的任务
|
// 4. 循环查询数据库,获取需要派发的任务
|
||||||
while (maxDispatchNum > currentDispatchNum) {
|
while (maxDispatchNum > currentDispatchNum) {
|
||||||
@ -490,8 +492,15 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
|||||||
// 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address
|
// 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address
|
||||||
String ptAddress = task.getAddress();
|
String ptAddress = task.getAddress();
|
||||||
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
|
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
|
||||||
if (taskNeedByPassTaskTracker(availablePtIps)) {
|
if (taskNeedByPassTaskTracker()) {
|
||||||
|
int loopTime = 0;
|
||||||
do {
|
do {
|
||||||
|
loopTime++;
|
||||||
|
if (loopTime > 3) {
|
||||||
|
log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId);
|
||||||
|
skipThisRound.set(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
|
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
|
||||||
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
|
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
|
||||||
} else {
|
} else {
|
||||||
@ -501,6 +510,10 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
|||||||
dispatchTask(task, ptAddress);
|
dispatchTask(task, ptAddress);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (skipThisRound.get()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// 数量不足 或 查询失败,则终止循环
|
// 数量不足 或 查询失败,则终止循环
|
||||||
if (needDispatchTasks.size() < dbQueryLimit) {
|
if (needDispatchTasks.size() < dbQueryLimit) {
|
||||||
break;
|
break;
|
||||||
@ -510,18 +523,8 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
|||||||
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
|
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private boolean taskNeedByPassTaskTracker() {
|
||||||
* padding的生效条件: 在map || mapReduce 的情况下, 且是该appId的worker是 非单机运行时,才生效。
|
|
||||||
* fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中,
|
|
||||||
* 且该线程不能通过停止任务的方式去停止,只能通过重启该work实例的方式释放该线程。
|
|
||||||
*/
|
|
||||||
private boolean taskNeedByPassTaskTracker(List<String> availablePtIps) {
|
|
||||||
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
|
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
|
||||||
|
|
||||||
if (availablePtIps.size() <= 1) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
|
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user