fix: reduce Probabilistic non-execution #1033

This commit is contained in:
tjq 2024-11-21 22:11:44 +08:00
parent 0bb069fa5b
commit f44bd43d13
4 changed files with 79 additions and 8 deletions

View File

@ -0,0 +1,46 @@
package tech.powerjob.samples.processors.test;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* <a href="https://github.com/PowerJob/PowerJob/issues/1033">测试长时间执行的任务 idle 导致 reduce 不执行</a>
*
* @author tjq
* @since 2024/11/21
*/
@Slf4j
@Component
public class IdleBugTestProcessor implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
if (isRootTask()) {
map(Lists.newArrayList("1", "2", "3", "4", "5", "6", "7"), "L1_TASK");
return new ProcessResult(true, "MAP_SUCCESS");
}
Object subTask = context.getSubTask();
log.info("[IdleBugTestProcessor] subTask:={}, start to process!", subTask);
// 同步修改 idle 阈值
CommonUtils.easySleep(ThreadLocalRandom.current().nextInt(40001, 60000));
log.info("[IdleBugTestProcessor] subTask:={}, finished process", subTask);
return new ProcessResult(true, "SUCCESS_" + subTask);
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
log.info("[IdleBugTestProcessor] [REDUCE] REDUCE!!!");
return new ProcessResult(true, "SUCCESS");
}
}

View File

@ -265,14 +265,21 @@ public class ProcessorTracker {
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
statusReportReq.setAddress(workerRuntime.getWorkerAddress());
TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
destroy();
return;
boolean shouldDestroyWhenIdle = shouldDestroyWhenIdle();
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, shouldDestroyWhenIdle: {}", instanceId, idleTime, shouldDestroyWhenIdle);
if (shouldDestroyWhenIdle) {
log.warn("[ProcessorTracker-{}] it's time to tell TaskTracker and then destroy self.", instanceId);
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
statusReportReq.setAddress(workerRuntime.getWorkerAddress());
TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
destroy();
return;
}
}
}
}
@ -300,6 +307,22 @@ public class ProcessorTracker {
}
/**
* 空闲的时候是否需要自我销毁
* @return true or false
*/
private boolean shouldDestroyWhenIdle() {
/*
https://github.com/PowerJob/PowerJob/issues/1033
map 情况下如果子任务执行较长任务末期可能出现某一个节点的任务仍在执行其他机器都已经无任务可执行 idle 逻辑关闭节点如果不幸在生成reduce任务后并派发前关闭了 TaskTracker 所在节点的 PT reduce 任务就会直接失败
解决方案 TT 节点的 PT本身不存在分布式不一致问题因此不需要 idle 直接关闭 PT 的机制
*/
if (taskTrackerAddress.equalsIgnoreCase(workerRuntime.getWorkerAddress())) {
return false;
}
return true;
}
/**
* 计算线程池大小

View File

@ -232,6 +232,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
} else {
log.info("[TaskTracker-{}] all subTask has done, start to create final task", instanceId);
// 不存在代表前置任务刚刚执行完毕需要创建 lastTask最终任务必须在本机执行
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);

View File

@ -298,7 +298,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* @param heartbeatReq ProcessorTracker任务的执行管理器发来的心跳包包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
log.debug("[TaskTracker-{}] receive PT's heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲检查是否已经接收到全部该 ProcessorTracker 负责的任务