mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
fix: NPE when some app has on worker connected
This commit is contained in:
commit
9949e23bc4
@ -1,14 +1,14 @@
|
|||||||
package tech.powerjob.server.remote.worker;
|
package tech.powerjob.server.remote.worker;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
import tech.powerjob.common.model.DeployedContainerInfo;
|
import tech.powerjob.common.model.DeployedContainerInfo;
|
||||||
import tech.powerjob.server.common.module.WorkerInfo;
|
import tech.powerjob.server.common.module.WorkerInfo;
|
||||||
import tech.powerjob.server.extension.WorkerFilter;
|
import tech.powerjob.server.extension.WorkerFilter;
|
||||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||||
import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -34,6 +34,7 @@ public class WorkerClusterQueryService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* get worker for job
|
* get worker for job
|
||||||
|
*
|
||||||
* @param jobInfo job
|
* @param jobInfo job
|
||||||
* @return worker cluster info, sorted by metrics desc
|
* @return worker cluster info, sorted by metrics desc
|
||||||
*/
|
*/
|
||||||
@ -44,7 +45,7 @@ public class WorkerClusterQueryService {
|
|||||||
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
|
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
|
||||||
|
|
||||||
// 按健康度排序
|
// 按健康度排序
|
||||||
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
|
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
|
||||||
|
|
||||||
// 限定集群大小(0代表不限制)
|
// 限定集群大小(0代表不限制)
|
||||||
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
|
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
|
||||||
@ -56,12 +57,13 @@ public class WorkerClusterQueryService {
|
|||||||
@DesignateServer
|
@DesignateServer
|
||||||
public List<WorkerInfo> getAllWorkers(Long appId) {
|
public List<WorkerInfo> getAllWorkers(Long appId) {
|
||||||
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
|
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
|
||||||
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
|
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
|
||||||
return workers;
|
return workers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get all alive workers
|
* get all alive workers
|
||||||
|
*
|
||||||
* @param appId appId
|
* @param appId appId
|
||||||
* @return alive workers
|
* @return alive workers
|
||||||
*/
|
*/
|
||||||
@ -71,8 +73,22 @@ public class WorkerClusterQueryService {
|
|||||||
return workers;
|
return workers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets worker info by address.
|
||||||
|
*
|
||||||
|
* @param appId the app id
|
||||||
|
* @param address the address
|
||||||
|
* @return the worker info by address
|
||||||
|
*/
|
||||||
public Optional<WorkerInfo> getWorkerInfoByAddress(Long appId, String address) {
|
public Optional<WorkerInfo> getWorkerInfoByAddress(Long appId, String address) {
|
||||||
return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address));
|
// this may cause NPE while address value is null .
|
||||||
|
//return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address));
|
||||||
|
final Map<String, WorkerInfo> workerInfosByAppId = getWorkerInfosByAppId(appId);
|
||||||
|
//add null check for both workerInfos Map and address
|
||||||
|
if (null != workerInfosByAppId && null != address) {
|
||||||
|
return Optional.ofNullable(workerInfosByAppId.get(address));
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
|
public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
|
||||||
@ -81,6 +97,7 @@ public class WorkerClusterQueryService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取某个应用容器的部署情况
|
* 获取某个应用容器的部署情况
|
||||||
|
*
|
||||||
* @param appId 应用ID
|
* @param appId 应用ID
|
||||||
* @param containerId 容器ID
|
* @param containerId 容器ID
|
||||||
* @return 部署情况
|
* @return 部署情况
|
||||||
@ -104,6 +121,7 @@ public class WorkerClusterQueryService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* filter invalid worker for job
|
* filter invalid worker for job
|
||||||
|
*
|
||||||
* @param workerInfo worker info
|
* @param workerInfo worker info
|
||||||
* @param jobInfo job info
|
* @param jobInfo job info
|
||||||
* @return filter this worker when return true
|
* @return filter this worker when return true
|
||||||
|
Loading…
x
Reference in New Issue
Block a user