From 755e056143d92f59dd44ddf92fb37044c3b0ac2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sun, 18 Oct 2020 22:49:14 +0800 Subject: [PATCH] feat: remove shade & ready to release --- powerjob-client/pom.xml | 9 +++----- .../core/ha/ProcessorTrackerStatusHolder.java | 22 +++++++++++-------- .../tracker/processor/ProcessorTracker.java | 2 +- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 1638d059..987c1d41 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -35,12 +35,6 @@ com.github.kfcfans powerjob-common ${powerjob.common.version} - - - com.typesafe.akka - * - - @@ -54,6 +48,8 @@ + + diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java index d9ba0ce6..94b7f6dc 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -30,22 +30,26 @@ public class ProcessorTrackerStatusHolder { }); } + /** + * 根据地址获取 ProcessorTracker 的状态 + * @param address IP:Port + * @return status + */ public ProcessorTrackerStatus getProcessorTrackerStatus(String address) { - return address2Status.get(address); + // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率 + return address2Status.computeIfAbsent(address, ignore -> { + log.warn("[ProcessorTrackerStatusHolder] unregistered worker: {}", address); + ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); + processorTrackerStatus.init(address); + return processorTrackerStatus; + }); } /** * 根据 ProcessorTracker 的心跳更新状态 */ public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) { - // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率 - ProcessorTrackerStatus pts = address2Status.computeIfAbsent(heartbeatReq.getAddress(), ignore-> { - log.warn("[ProcessorTrackerStatusHolder] unregistered worker's heartbeat request: {}", heartbeatReq); - ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); - processorTrackerStatus.init(heartbeatReq.getAddress()); - return processorTrackerStatus; - }); - pts.update(heartbeatReq); + getProcessorTrackerStatus(heartbeatReq.getAddress()).update(heartbeatReq); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index b481e0c7..c6b536b9 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -65,7 +65,7 @@ public class ProcessorTracker { private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128; // 长时间空闲的 ProcessorTracker 会发起销毁请求 - private static final long MAX_IDLE_TIME = 300000; + private static final long MAX_IDLE_TIME = 120000; // 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败) private boolean lethal = false;