fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中

This commit is contained in:
shenkang 2024-05-30 11:11:46 +08:00
parent d44131128e
commit 4507a6a883

View File

@ -456,6 +456,8 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 数据库查询限制每次最多查询几个任务
private static final int DB_QUERY_LIMIT = 100;
private static final int STANDALONE_SIZE = 1;
@Override
public void run0() {
@ -490,7 +492,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 获取 ProcessorTracker 地址如果 Task 中自带了 Address则使用该 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
if (taskNeedByPassTaskTracker()) {
if (taskNeedByPassTaskTracker(availablePtIps)) {
do {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
@ -510,9 +512,15 @@ public abstract class HeavyTaskTracker extends TaskTracker {
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
}
private boolean taskNeedByPassTaskTracker() {
/**
* padding的生效条件 在map || mapReduce 的情况下 且是该appId的worker是 非单机运行时才生效
* fix: 当该appId的worker是单机运行 padding时 导致Dispatcher分发任务处于死循环中 致使无法分发任务状态一直为运行中
* 且该线程不能通过停止任务的方式去停止只能通过重启该work实例的方式释放该线程
*/
private boolean taskNeedByPassTaskTracker(List<String> availablePtIps) {
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()) &&
availablePtIps.size() != STANDALONE_SIZE;
}
return false;
}