From dfd1fd069b8d99ff343caeeac544a05d2b3891ec Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 10 Sep 2022 09:27:57 +0800 Subject: [PATCH] fix: invalid random when JobInfo's maxWorkerCount is a small value #449 --- .../powerjob/server/core/DispatchService.java | 16 +--------------- .../remote/worker/WorkerClusterQueryService.java | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index 44d36d60..881c5061 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -146,9 +146,8 @@ public class DispatchService { // 构造任务调度请求 ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList); - // 发送请求(不可靠,需要一个后台线程定期轮询状态) - WorkerInfo taskTracker = selectTaskTracker(jobInfo, suitableWorkers); + WorkerInfo taskTracker = suitableWorkers.get(0); String taskTrackerAddress = taskTracker.getAddress(); transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); @@ -196,17 +195,4 @@ public class DispatchService { req.setThreadConcurrency(jobInfo.getConcurrency()); return req; } - - private WorkerInfo selectTaskTracker(JobInfoDO jobInfo, List workerInfos) { - DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); - switch (dispatchStrategy) { - case HEALTH_FIRST: - return workerInfos.get(0); - case RANDOM: - return workerInfos.get(ThreadLocalRandom.current().nextInt(workerInfos.size())); - default: - } - // impossible, indian java - return workerInfos.get(0); - } } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java index da555c0b..04002cb7 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.DispatchStrategy; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.extension.WorkerFilter; @@ -44,8 +45,17 @@ public class WorkerClusterQueryService { workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); - // 按健康度排序 - workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); + DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); + switch (dispatchStrategy) { + case RANDOM: + Collections.shuffle(workers); + break; + case HEALTH_FIRST: + workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); + break; + default: + // do nothing + } // 限定集群大小(0代表不限制) if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {