From ac1b1fe0c8cf1ebffed5aeb72ab282901550d331 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 10 Sep 2022 09:55:26 +0800 Subject: [PATCH] fix: limit worker num failed in map/mapreduce job #450 --- .../common/request/ServerScheduleJobReq.java | 5 ++ .../powerjob/server/core/DispatchService.java | 1 + .../core/ha/ProcessorTrackerStatusHolder.java | 54 +++++++++++++++++-- .../worker/core/tracker/task/TaskTracker.java | 15 +++--- 4 files changed, 65 insertions(+), 10 deletions(-) diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java index 93093236..92fef38e 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java @@ -20,6 +20,11 @@ public class ServerScheduleJobReq implements PowerSerializable { */ private List allWorkerAddress; + /** + * 最大机器数量 + */ + private Integer maxWorkerCount; + /* *********************** 任务相关属性 *********************** */ /** diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index 881c5061..db0183f7 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -181,6 +181,7 @@ public class DispatchService { } req.setInstanceId(instanceInfo.getInstanceId()); req.setAllWorkerAddress(finalWorkersIpList); + req.setMaxWorkerCount(jobInfo.getMaxWorkerCount()); // 设置工作流ID req.setWfInstanceId(instanceInfo.getWfInstanceId()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java index 1a67c378..b5721b91 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -17,10 +17,15 @@ import java.util.Map; @Slf4j public class ProcessorTrackerStatusHolder { + private final Long instanceId; + private final Integer maxWorkerCount; // ProcessorTracker的address(IP:Port) -> 状态 private final Map address2Status; - public ProcessorTrackerStatusHolder(List allWorkerAddress) { + public ProcessorTrackerStatusHolder(Long instanceId, Integer maxWorkerCount, List allWorkerAddress) { + + this.instanceId = instanceId; + this.maxWorkerCount = maxWorkerCount; address2Status = Maps.newConcurrentMap(); allWorkerAddress.forEach(address -> { @@ -38,7 +43,7 @@ public class ProcessorTrackerStatusHolder { public ProcessorTrackerStatus getProcessorTrackerStatus(String address) { // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率 return address2Status.computeIfAbsent(address, ignore -> { - log.warn("[ProcessorTrackerStatusHolder] unregistered worker: {}", address); + log.warn("[PTStatusHolder-{}] unregistered worker: {}", instanceId, address); ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); processorTrackerStatus.init(address); return processorTrackerStatus; @@ -90,9 +95,9 @@ public class ProcessorTrackerStatusHolder { /** * 注册新的执行节点 * @param address 新的执行节点地址 - * @return true: 注册成功 / false:已存在 + * @return true: register successfully / false: already exists */ - public boolean register(String address) { + private boolean registerOne(String address) { ProcessorTrackerStatus pts = address2Status.get(address); if (pts != null) { return false; @@ -100,9 +105,50 @@ public class ProcessorTrackerStatusHolder { pts = new ProcessorTrackerStatus(); pts.init(address); address2Status.put(address, pts); + log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address); return true; } + public void register(List workerIpList) { + if (endlessWorkerNum()) { + workerIpList.forEach(this::registerOne); + return; + } + List availableProcessorTrackers = getAvailableProcessorTrackers(); + int currentWorkerSize = availableProcessorTrackers.size(); + int needMoreNum = maxWorkerCount - currentWorkerSize; + if (needMoreNum <= 0) { + return; + } + + log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum); + + for (String newIp : workerIpList) { + boolean success = registerOne(newIp); + if (success) { + needMoreNum --; + } + if (needMoreNum <= 0) { + return; + } + } + } + + /** + * 检查是否需要动态加载新的执行器 + * @return check need more workers + */ + public boolean checkNeedMoreWorker() { + if (endlessWorkerNum()) { + return true; + } + return getAvailableProcessorTrackers().size() < maxWorkerCount; + } + + private boolean endlessWorkerNum() { + return maxWorkerCount == null || maxWorkerCount == 0; + } + public void remove(List addressList) { addressList.forEach(address2Status::remove); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index fdf35fa9..492b8d7b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -124,7 +124,7 @@ public abstract class TaskTracker { // 保护性操作 instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); - this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); + this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); this.finished = new AtomicBoolean(false); // 只有工作流中的任务允许向工作流中追加上下文数据 @@ -562,6 +562,13 @@ public abstract class TaskTracker { protected class WorkerDetector implements Runnable { @Override public void run() { + + boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker(); + log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker); + if (!needMoreWorker) { + return; + } + String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); if (StringUtils.isEmpty(serverPath)) { log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); @@ -575,11 +582,7 @@ public abstract class TaskTracker { } try { List workerList = JsonUtils.parseObject(response.getData(), new TypeReference>() {}); - workerList.forEach(address -> { - if (ptStatusHolder.register(address)) { - log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address); - } - }); + ptStatusHolder.register(workerList); } catch (Exception e) { log.warn("[TaskTracker-{}] detective failed!", instanceId, e); }