refactor: optimize worker cluster service

This commit is contained in:
tjq 2021-02-20 00:08:23 +08:00
parent e24f20f5ba
commit ed94bef458
6 changed files with 97 additions and 85 deletions

View File

@ -36,6 +36,7 @@ public class ClusterStatusHolder {
/** /**
* 更新 worker 机器的状态 * 更新 worker 机器的状态
* @param heartbeat 心跳请求
*/ */
public void updateStatus(WorkerHeartbeat heartbeat) { public void updateStatus(WorkerHeartbeat heartbeat) {
@ -64,26 +65,12 @@ public class ClusterStatusHolder {
} }
} }
public List<WorkerInfo> getAllWorkers() {
return Lists.newLinkedList(address2WorkerInfo.values());
}
public WorkerInfo getWorkerInfo(String address) {
return address2WorkerInfo.get(address);
}
/** /**
* 获取当前连接的的机器详情 * 获取该集群所有的机器信息
* @return map * @return 地址: 机器信息
*/ */
public Map<String, WorkerInfo> getActiveWorkerInfo() { public Map<String, WorkerInfo> getAllWorkers() {
Map<String, WorkerInfo> res = Maps.newHashMap(); return address2WorkerInfo;
address2WorkerInfo.forEach((address, workerInfo) -> {
if (!workerInfo.timeout()) {
res.put(address, workerInfo);
}
});
return res;
} }
/** /**

View File

@ -1,12 +1,13 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster; package com.github.kfcfans.powerjob.server.remote.worker.cluster;
import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.*; import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* 管理 worker 集群信息 * 管理 worker 集群信息
@ -31,30 +32,6 @@ public class WorkerClusterManagerService {
clusterStatusHolder.updateStatus(heartbeat); clusterStatusHolder.updateStatus(heartbeat);
} }
/**
* 获取某个 app 下所有连接过的机器信息
* @param appId appId
* @return 所有连接过的机器信息列表
*/
public static List<WorkerInfo> getWorkerInfosByAppId(Long appId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
return Collections.emptyList();
}
return clusterStatusHolder.getAllWorkers();
}
public static Optional<WorkerInfo> getWorkerInfo(Long appId, String address) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
return Optional.empty();
}
return Optional.ofNullable(clusterStatusHolder.getWorkerInfo(address));
}
/** /**
* 清理不需要的worker信息 * 清理不需要的worker信息
* @param usingAppIds 需要维护的appId其余的数据将被删除 * @param usingAppIds 需要维护的appId其余的数据将被删除
@ -64,32 +41,6 @@ public class WorkerClusterManagerService {
appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
} }
/**
* 获取当前连接到该Server的Worker信息
* @param appId 应用ID
* @return Worker信息
*/
public static Map<String, WorkerInfo> getActiveWorkerInfo(Long appId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
return Collections.emptyMap();
}
return clusterStatusHolder.getActiveWorkerInfo();
}
/**
* 获取某个应用容器的部署情况
* @param appId 应用ID
* @param containerId 容器ID
* @return 部署情况
*/
public static List<DeployedContainerInfo> getDeployedContainerInfos(Long appId, Long containerId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
return Collections.emptyList();
}
return clusterStatusHolder.getDeployedContainerInfos(containerId);
}
/** /**
* 清理缓存信息防止 OOM * 清理缓存信息防止 OOM
@ -98,7 +49,8 @@ public class WorkerClusterManagerService {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::release); appId2ClusterStatus.values().forEach(ClusterStatusHolder::release);
} }
public static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() { protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
return appId2ClusterStatus; return appId2ClusterStatus;
} }
} }

View File

@ -1,12 +1,18 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster; package com.github.kfcfans.powerjob.server.remote.worker.cluster;
import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
import com.github.kfcfans.powerjob.server.extension.WorkerFilter; import com.github.kfcfans.powerjob.server.extension.WorkerFilter;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional;
/** /**
* 获取 worker 集群信息 * 获取 worker 集群信息
@ -14,6 +20,7 @@ import java.util.List;
* @author tjq * @author tjq
* @since 2021/2/19 * @since 2021/2/19
*/ */
@Slf4j
@Service @Service
public class WorkerClusterQueryService { public class WorkerClusterQueryService {
@ -31,7 +38,7 @@ public class WorkerClusterQueryService {
*/ */
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) { public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {
List<WorkerInfo> workers = WorkerClusterManagerService.getWorkerInfosByAppId(jobInfo.getAppId()); List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
@ -47,11 +54,59 @@ public class WorkerClusterQueryService {
@DesignateServer(appIdParameterName = "appId") @DesignateServer(appIdParameterName = "appId")
public List<WorkerInfo> getAllWorkers(Long appId) { public List<WorkerInfo> getAllWorkers(Long appId) {
List<WorkerInfo> workers = WorkerClusterManagerService.getWorkerInfosByAppId(appId); List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
return workers; return workers;
} }
/**
* get all alive workers
* @param appId appId
* @return alive workers
*/
public List<WorkerInfo> getAllAliveWorkers(Long appId) {
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
workers.removeIf(WorkerInfo::timeout);
return workers;
}
public Optional<WorkerInfo> getWorkerInfoByAddress(Long appId, String address) {
return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address));
}
public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
return WorkerClusterManagerService.getAppId2ClusterStatus();
}
/**
* 获取某个应用容器的部署情况
* @param appId 应用ID
* @param containerId 容器ID
* @return 部署情况
*/
public List<DeployedContainerInfo> getDeployedContainerInfos(Long appId, Long containerId) {
ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);
if (clusterStatusHolder == null) {
return Collections.emptyList();
}
return clusterStatusHolder.getDeployedContainerInfos(containerId);
}
private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {
ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
return Collections.emptyMap();
}
return clusterStatusHolder.getAllWorkers();
}
/**
* filter invalid worker for job
* @param workerInfo worker info
* @param jobInfo job info
* @return filter this worker when return true
*/
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
for (WorkerFilter filter : workerFilters) { for (WorkerFilter filter : workerFilters) {
if (filter.filter(workerInfo, jobInfo)) { if (filter.filter(workerInfo, jobInfo)) {

View File

@ -10,15 +10,16 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.server.web.request.SaveContainerInfoRequest; import com.github.kfcfans.powerjob.server.web.request.SaveContainerInfoRequest;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -54,6 +55,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** /**
* 容器服务 * 容器服务
@ -74,6 +76,9 @@ public class ContainerService {
@Resource @Resource
private GridFsManager gridFsManager; private GridFsManager gridFsManager;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
// 下载用的分段锁 // 下载用的分段锁
private final SegmentLock segmentLock = new SegmentLock(4); private final SegmentLock segmentLock = new SegmentLock(4);
// 并发部署的机器数量 // 并发部署的机器数量
@ -125,8 +130,8 @@ public class ContainerService {
} }
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId()); ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
WorkerClusterManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> { workerClusterQueryService.getAllAliveWorkers(container.getAppId()).forEach(workerInfo -> {
ActorSelection workerActor = AkkaStarter.getWorkerActor(akkaAddress); ActorSelection workerActor = AkkaStarter.getWorkerActor(workerInfo.getAddress());
workerActor.tell(destroyRequest, null); workerActor.tell(destroyRequest, null);
}); });
@ -247,7 +252,10 @@ public class ContainerService {
containerInfoRepository.saveAndFlush(container); containerInfoRepository.saveAndFlush(container);
// 开始部署需要分批进行 // 开始部署需要分批进行
Set<String> workerAddressList = WorkerClusterManagerService.getActiveWorkerInfo(container.getAppId()).keySet(); Set<String> workerAddressList = workerClusterQueryService.getAllAliveWorkers(container.getAppId())
.stream()
.map(WorkerInfo::getAddress)
.collect(Collectors.toSet());
if (workerAddressList.isEmpty()) { if (workerAddressList.isEmpty()) {
remote.sendText("SYSTEM: there is no worker available now, deploy failed!"); remote.sendText("SYSTEM: there is no worker available now, deploy failed!");
return; return;
@ -284,9 +292,12 @@ public class ContainerService {
* @return 拼接好的可阅读字符串 * @return 拼接好的可阅读字符串
*/ */
public String fetchDeployedInfo(Long appId, Long containerId) { public String fetchDeployedInfo(Long appId, Long containerId) {
List<DeployedContainerInfo> infoList = WorkerClusterManagerService.getDeployedContainerInfos(appId, containerId); List<DeployedContainerInfo> infoList = workerClusterQueryService.getDeployedContainerInfos(appId, containerId);
Set<String> aliveWorkers = WorkerClusterManagerService.getActiveWorkerInfo(appId).keySet(); Set<String> aliveWorkers = workerClusterQueryService.getAllAliveWorkers(appId)
.stream()
.map(WorkerInfo::getAddress)
.collect(Collectors.toSet());
Set<String> deployedList = Sets.newLinkedHashSet(); Set<String> deployedList = Sets.newLinkedHashSet();
List<String> unDeployedList = Lists.newLinkedList(); List<String> unDeployedList = Lists.newLinkedList();

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
@ -55,6 +56,9 @@ public class InstanceService {
@Resource @Resource
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
/** /**
* 创建任务实例注意该方法并不调用 saveAndFlush如果有需要立即同步到DB的需求请在方法结束后手动调用 flush * 创建任务实例注意该方法并不调用 saveAndFlush如果有需要立即同步到DB的需求请在方法结束后手动调用 flush
* ******************************************** * ********************************************
@ -124,7 +128,7 @@ public class InstanceService {
不可靠通知停止 TaskTracker 不可靠通知停止 TaskTracker
假如没有成功关闭之后 TaskTracker 会再次 reportStatus按照流程instanceLog 会被更新为 RUNNING开发者可以再次手动关闭 假如没有成功关闭之后 TaskTracker 会再次 reportStatus按照流程instanceLog 会被更新为 RUNNING开发者可以再次手动关闭
*/ */
Optional<WorkerInfo> workerInfoOpt = WorkerClusterManagerService.getWorkerInfo(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress()); Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) { if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get(); WorkerInfo workerInfo = workerInfoOpt.get();
@ -266,7 +270,7 @@ public class InstanceService {
return detail; return detail;
} }
Optional<WorkerInfo> workerInfoOpt = WorkerClusterManagerService.getWorkerInfo(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress()); Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) { if (workerInfoOpt.isPresent()) {
WorkerInfo workerInfo = workerInfoOpt.get(); WorkerInfo workerInfo = workerInfoOpt.get();
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId); ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId);

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService; import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
@ -36,6 +37,8 @@ public class ServerController {
private ServerElectionService serverElectionService; private ServerElectionService serverElectionService;
@Resource @Resource
private AppInfoRepository appInfoRepository; private AppInfoRepository appInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
@GetMapping("/assert") @GetMapping("/assert")
public ResultDTO<Long> assertAppName(String appName) { public ResultDTO<Long> assertAppName(String appName) {
@ -56,9 +59,9 @@ public class ServerController {
res.put("communicationSystemInfo", transportService.getProtocol2Transporter()); res.put("communicationSystemInfo", transportService.getProtocol2Transporter());
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis())); res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeZone", TimeZone.getDefault().getDisplayName()); res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
res.put("appIds", WorkerClusterManagerService.getAppId2ClusterStatus().keySet()); res.put("appIds", workerClusterQueryService.getAppId2ClusterStatus().keySet());
if (debug) { if (debug) {
res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(WorkerClusterManagerService.getAppId2ClusterStatus()))); res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(workerClusterQueryService.getAppId2ClusterStatus())));
} }
return ResultDTO.success(res); return ResultDTO.success(res);