diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java
new file mode 100644
index 00000000..abe9f9c2
--- /dev/null
+++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java
@@ -0,0 +1,46 @@
+package tech.powerjob.samples.processors.test;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import tech.powerjob.common.utils.CommonUtils;
+import tech.powerjob.worker.core.processor.ProcessResult;
+import tech.powerjob.worker.core.processor.TaskContext;
+import tech.powerjob.worker.core.processor.TaskResult;
+import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * 测试长时间执行的任务 idle 导致 reduce 不执行
+ *
+ * @author tjq
+ * @since 2024/11/21
+ */
+@Slf4j
+@Component
+public class IdleBugTestProcessor implements MapReduceProcessor {
+
+ @Override
+ public ProcessResult process(TaskContext context) throws Exception {
+ if (isRootTask()) {
+ map(Lists.newArrayList("1", "2", "3", "4", "5", "6", "7"), "L1_TASK");
+ return new ProcessResult(true, "MAP_SUCCESS");
+ }
+
+ Object subTask = context.getSubTask();
+ log.info("[IdleBugTestProcessor] subTask:={}, start to process!", subTask);
+
+ // 同步修改 idle 阈值
+ CommonUtils.easySleep(ThreadLocalRandom.current().nextInt(40001, 60000));
+ log.info("[IdleBugTestProcessor] subTask:={}, finished process", subTask);
+ return new ProcessResult(true, "SUCCESS_" + subTask);
+ }
+
+ @Override
+ public ProcessResult reduce(TaskContext context, List taskResults) {
+ log.info("[IdleBugTestProcessor] [REDUCE] REDUCE!!!");
+ return new ProcessResult(true, "SUCCESS");
+ }
+}
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
index 3d7650cd..c9daebd6 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
@@ -265,14 +265,21 @@ public class ProcessorTracker {
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
- log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
- // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
- ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
- statusReportReq.setAddress(workerRuntime.getWorkerAddress());
- TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
- destroy();
- return;
+ boolean shouldDestroyWhenIdle = shouldDestroyWhenIdle();
+ log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, shouldDestroyWhenIdle: {}", instanceId, idleTime, shouldDestroyWhenIdle);
+
+ if (shouldDestroyWhenIdle) {
+
+ log.warn("[ProcessorTracker-{}] it's time to tell TaskTracker and then destroy self.", instanceId);
+
+ // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
+ ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
+ statusReportReq.setAddress(workerRuntime.getWorkerAddress());
+ TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
+ destroy();
+ return;
+ }
}
}
}
@@ -300,6 +307,22 @@ public class ProcessorTracker {
}
+ /**
+ * 空闲的时候是否需要自我销毁
+ * @return true or false
+ */
+ private boolean shouldDestroyWhenIdle() {
+ /*
+ https://github.com/PowerJob/PowerJob/issues/1033
+ map 情况下,如果子任务执行较长,任务末期可能出现某一个节点的任务仍在执行,其他机器都已经无任务可执行,被 idle 逻辑关闭节点。如果不幸在‘生成reduce任务后并派发前’关闭了 TaskTracker 所在节点的 PT,那 reduce 任务就会直接失败
+ 解决方案:同 TT 节点的 PT,本身不存在分布式不一致问题,因此不需要 idle 直接关闭 PT 的机制
+ */
+ if (taskTrackerAddress.equalsIgnoreCase(workerRuntime.getWorkerAddress())) {
+ return false;
+ }
+ return true;
+ }
+
/**
* 计算线程池大小
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
index 2ad7ac80..6fc8b1ea 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
@@ -232,6 +232,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
} else {
+ log.info("[TaskTracker-{}] all subTask has done, start to create final task", instanceId);
+
// 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
index 443cbfd1..fe03a5fc 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
@@ -298,7 +298,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
- log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
+ log.debug("[TaskTracker-{}] receive PT's heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务