fix: invalid random when JobInfo's maxWorkerCount is a small value #449

This commit is contained in:
tjq 2022-09-10 09:27:57 +08:00
parent 22db37cad9
commit dfd1fd069b
2 changed files with 13 additions and 17 deletions

View File

@ -146,9 +146,8 @@ public class DispatchService {
// 构造任务调度请求
ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);
// 发送请求不可靠需要一个后台线程定期轮询状态
WorkerInfo taskTracker = selectTaskTracker(jobInfo, suitableWorkers);
WorkerInfo taskTracker = suitableWorkers.get(0);
String taskTrackerAddress = taskTracker.getAddress();
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
@ -196,17 +195,4 @@ public class DispatchService {
req.setThreadConcurrency(jobInfo.getConcurrency());
return req;
}
private WorkerInfo selectTaskTracker(JobInfoDO jobInfo, List<WorkerInfo> workerInfos) {
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case HEALTH_FIRST:
return workerInfos.get(0);
case RANDOM:
return workerInfos.get(ThreadLocalRandom.current().nextInt(workerInfos.size()));
default:
}
// impossible, indian java
return workerInfos.get(0);
}
}

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter;
@ -44,8 +45,17 @@ public class WorkerClusterQueryService {
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
// 按健康度排序
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case RANDOM:
Collections.shuffle(workers);
break;
case HEALTH_FIRST:
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
break;
default:
// do nothing
}
// 限定集群大小0代表不限制
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {