diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java index 24dd631d..98d20745 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java @@ -36,6 +36,7 @@ public class ClusterStatusHolder { /** * 更新 worker 机器的状态 + * @param heartbeat 心跳请求 */ public void updateStatus(WorkerHeartbeat heartbeat) { @@ -64,26 +65,12 @@ public class ClusterStatusHolder { } } - public List getAllWorkers() { - return Lists.newLinkedList(address2WorkerInfo.values()); - } - - public WorkerInfo getWorkerInfo(String address) { - return address2WorkerInfo.get(address); - } - /** - * 获取当前连接的的机器详情 - * @return map + * 获取该集群所有的机器信息 + * @return 地址: 机器信息 */ - public Map getActiveWorkerInfo() { - Map res = Maps.newHashMap(); - address2WorkerInfo.forEach((address, workerInfo) -> { - if (!workerInfo.timeout()) { - res.put(address, workerInfo); - } - }); - return res; + public Map getAllWorkers() { + return address2WorkerInfo; } /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java index edad4602..b10d2097 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java @@ -1,12 +1,13 @@ package com.github.kfcfans.powerjob.server.remote.worker.cluster; -import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * 管理 worker 集群信息 @@ -31,30 +32,6 @@ public class WorkerClusterManagerService { clusterStatusHolder.updateStatus(heartbeat); } - /** - * 获取某个 app 下所有连接过的机器信息 - * @param appId appId - * @return 所有连接过的机器信息列表 - */ - public static List getWorkerInfosByAppId(Long appId) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); - return Collections.emptyList(); - } - return clusterStatusHolder.getAllWorkers(); - } - - - public static Optional getWorkerInfo(Long appId, String address) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); - return Optional.empty(); - } - return Optional.ofNullable(clusterStatusHolder.getWorkerInfo(address)); - } - /** * 清理不需要的worker信息 * @param usingAppIds 需要维护的appId,其余的数据将被删除 @@ -64,32 +41,6 @@ public class WorkerClusterManagerService { appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); } - /** - * 获取当前连接到该Server的Worker信息 - * @param appId 应用ID - * @return Worker信息 - */ - public static Map getActiveWorkerInfo(Long appId) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - return Collections.emptyMap(); - } - return clusterStatusHolder.getActiveWorkerInfo(); - } - - /** - * 获取某个应用容器的部署情况 - * @param appId 应用ID - * @param containerId 容器ID - * @return 部署情况 - */ - public static List getDeployedContainerInfos(Long appId, Long containerId) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - return Collections.emptyList(); - } - return clusterStatusHolder.getDeployedContainerInfos(containerId); - } /** * 清理缓存信息,防止 OOM @@ -98,7 +49,8 @@ public class WorkerClusterManagerService { appId2ClusterStatus.values().forEach(ClusterStatusHolder::release); } - public static Map getAppId2ClusterStatus() { + protected static Map getAppId2ClusterStatus() { return appId2ClusterStatus; } + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java index 1986d2f3..ad5cf105 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java @@ -1,12 +1,18 @@ package com.github.kfcfans.powerjob.server.remote.worker.cluster; +import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo; import com.github.kfcfans.powerjob.server.extension.WorkerFilter; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Optional; /** * 获取 worker 集群信息 @@ -14,6 +20,7 @@ import java.util.List; * @author tjq * @since 2021/2/19 */ +@Slf4j @Service public class WorkerClusterQueryService { @@ -31,7 +38,7 @@ public class WorkerClusterQueryService { */ public List getSuitableWorkers(JobInfoDO jobInfo) { - List workers = WorkerClusterManagerService.getWorkerInfosByAppId(jobInfo.getAppId()); + List workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values()); workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); @@ -47,11 +54,59 @@ public class WorkerClusterQueryService { @DesignateServer(appIdParameterName = "appId") public List getAllWorkers(Long appId) { - List workers = WorkerClusterManagerService.getWorkerInfosByAppId(appId); + List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); return workers; } + /** + * get all alive workers + * @param appId appId + * @return alive workers + */ + public List getAllAliveWorkers(Long appId) { + List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); + workers.removeIf(WorkerInfo::timeout); + return workers; + } + + public Optional getWorkerInfoByAddress(Long appId, String address) { + return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address)); + } + + public Map getAppId2ClusterStatus() { + return WorkerClusterManagerService.getAppId2ClusterStatus(); + } + + /** + * 获取某个应用容器的部署情况 + * @param appId 应用ID + * @param containerId 容器ID + * @return 部署情况 + */ + public List getDeployedContainerInfos(Long appId, Long containerId) { + ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); + if (clusterStatusHolder == null) { + return Collections.emptyList(); + } + return clusterStatusHolder.getDeployedContainerInfos(containerId); + } + + private Map getWorkerInfosByAppId(Long appId) { + ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); + if (clusterStatusHolder == null) { + log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); + return Collections.emptyMap(); + } + return clusterStatusHolder.getAllWorkers(); + } + + /** + * filter invalid worker for job + * @param workerInfo worker info + * @param jobInfo job info + * @return filter this worker when return true + */ private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { for (WorkerFilter filter : workerFilters) { if (filter.filter(workerInfo, jobInfo)) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java index 84f20ca0..8190a40f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java @@ -10,15 +10,16 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; -import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; +import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; -import com.github.kfcfans.powerjob.server.extension.LockService; +import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.server.web.request.SaveContainerInfoRequest; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -54,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * 容器服务 @@ -74,6 +76,9 @@ public class ContainerService { @Resource private GridFsManager gridFsManager; + @Resource + private WorkerClusterQueryService workerClusterQueryService; + // 下载用的分段锁 private final SegmentLock segmentLock = new SegmentLock(4); // 并发部署的机器数量 @@ -125,8 +130,8 @@ public class ContainerService { } ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId()); - WorkerClusterManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> { - ActorSelection workerActor = AkkaStarter.getWorkerActor(akkaAddress); + workerClusterQueryService.getAllAliveWorkers(container.getAppId()).forEach(workerInfo -> { + ActorSelection workerActor = AkkaStarter.getWorkerActor(workerInfo.getAddress()); workerActor.tell(destroyRequest, null); }); @@ -247,7 +252,10 @@ public class ContainerService { containerInfoRepository.saveAndFlush(container); // 开始部署(需要分批进行) - Set workerAddressList = WorkerClusterManagerService.getActiveWorkerInfo(container.getAppId()).keySet(); + Set workerAddressList = workerClusterQueryService.getAllAliveWorkers(container.getAppId()) + .stream() + .map(WorkerInfo::getAddress) + .collect(Collectors.toSet()); if (workerAddressList.isEmpty()) { remote.sendText("SYSTEM: there is no worker available now, deploy failed!"); return; @@ -284,9 +292,12 @@ public class ContainerService { * @return 拼接好的可阅读字符串 */ public String fetchDeployedInfo(Long appId, Long containerId) { - List infoList = WorkerClusterManagerService.getDeployedContainerInfos(appId, containerId); + List infoList = workerClusterQueryService.getDeployedContainerInfos(appId, containerId); - Set aliveWorkers = WorkerClusterManagerService.getActiveWorkerInfo(appId).keySet(); + Set aliveWorkers = workerClusterQueryService.getAllAliveWorkers(appId) + .stream() + .map(WorkerInfo::getAddress) + .collect(Collectors.toSet()); Set deployedList = Sets.newLinkedHashSet(); List unDeployedList = Lists.newLinkedList(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index a6064873..4307987f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.service.instance; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.InstanceDetail; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; @@ -55,6 +56,9 @@ public class InstanceService { @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private WorkerClusterQueryService workerClusterQueryService; + /** * 创建任务实例(注意,该方法并不调用 saveAndFlush,如果有需要立即同步到DB的需求,请在方法结束后手动调用 flush) * ******************************************** @@ -124,7 +128,7 @@ public class InstanceService { 不可靠通知停止 TaskTracker 假如没有成功关闭,之后 TaskTracker 会再次 reportStatus,按照流程,instanceLog 会被更新为 RUNNING,开发者可以再次手动关闭 */ - Optional workerInfoOpt = WorkerClusterManagerService.getWorkerInfo(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress()); + Optional workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress()); if (workerInfoOpt.isPresent()) { ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); WorkerInfo workerInfo = workerInfoOpt.get(); @@ -266,7 +270,7 @@ public class InstanceService { return detail; } - Optional workerInfoOpt = WorkerClusterManagerService.getWorkerInfo(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress()); + Optional workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress()); if (workerInfoOpt.isPresent()) { WorkerInfo workerInfo = workerInfoOpt.get(); ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java index 57d46d3b..fb419164 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java @@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.remote.transport.TransportService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -36,6 +37,8 @@ public class ServerController { private ServerElectionService serverElectionService; @Resource private AppInfoRepository appInfoRepository; + @Resource + private WorkerClusterQueryService workerClusterQueryService; @GetMapping("/assert") public ResultDTO assertAppName(String appName) { @@ -56,9 +59,9 @@ public class ServerController { res.put("communicationSystemInfo", transportService.getProtocol2Transporter()); res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis())); res.put("serverTimeZone", TimeZone.getDefault().getDisplayName()); - res.put("appIds", WorkerClusterManagerService.getAppId2ClusterStatus().keySet()); + res.put("appIds", workerClusterQueryService.getAppId2ClusterStatus().keySet()); if (debug) { - res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(WorkerClusterManagerService.getAppId2ClusterStatus()))); + res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(workerClusterQueryService.getAppId2ClusterStatus()))); } return ResultDTO.success(res);