From 57627305faa035374e4b82195023e2462eda37ee Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 22 Nov 2024 21:32:13 +0800 Subject: [PATCH] fix: Repeated execution after broadcast worker node down #1003 --- .../worker/core/tracker/task/heavy/CommonTaskTracker.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 6fc8b1ea..20a88a3b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -296,8 +296,10 @@ public class CommonTaskTracker extends HeavyTaskTracker { // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务 List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); if (!disconnectedPTs.isEmpty()) { - log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); - if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) { + // 广播任务节点丢失后若直接移除 IP 重试,后续会派发到其他节点,导致重复执行,因此此处不能重试 https://github.com/PowerJob/PowerJob/issues/1003 + boolean needRetry = !ExecuteType.BROADCAST.equals(executeType); + log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}, needRetry: {}.", instanceId, disconnectedPTs, needRetry); + if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, needRetry)) { ptStatusHolder.remove(disconnectedPTs); log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); }