From 2a6bb0b7f3a9e9a0ed5c5c5d23b2ddaac55faf59 Mon Sep 17 00:00:00 2001 From: Salieri Date: Sat, 17 Oct 2020 15:38:54 +0800 Subject: [PATCH] refactor: worker cluster query change to akka from htpp --- .../WorkerQueryExecutorClusterReq.java | 20 ++++++++++ .../server/akka/actors/ServerActor.java | 39 ++++++++++++++++--- .../web/controller/ServerController.java | 11 ------ 3 files changed, 54 insertions(+), 16 deletions(-) create mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java new file mode 100644 index 00000000..dd72c3c5 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.powerjob.common.request; + +import com.github.kfcfans.powerjob.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * worker 查询 执行器集群(动态上线需要) + * + * @author tjq + * @since 10/17/20 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkerQueryExecutorClusterReq implements OmsSerializable { + private Long appId; + private Long jobId; +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index fdd82196..685a2056 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -2,18 +2,18 @@ package com.github.kfcfans.powerjob.server.akka.actors; import akka.actor.AbstractActor; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest; -import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; -import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; -import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; -import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest; +import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.request.*; import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.core.env.Environment; +import java.util.List; import java.util.Optional; /** @@ -41,6 +42,7 @@ public class ServerActor extends AbstractActor { .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) .match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq) .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) + .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .build(); } @@ -110,6 +112,33 @@ public class ServerActor extends AbstractActor { getSender().tell(askResponse, getSelf()); } + /** + * 处理 worker 请求获取当前任务所有处理器节点的请求 + * @param req jobId + appId + */ + private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { + + AskResponse askResponse; + + Long jobId = req.getJobId(); + Long appId = req.getAppId(); + + JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + if (jobInfoOpt.isPresent()) { + JobInfoDO jobInfo = jobInfoOpt.get(); + if (!jobInfo.getAppId().equals(appId)) { + askResponse = AskResponse.failed("Permission Denied!"); + }else { + List sortedAvailableWorker = WorkerManagerService.getSortedAvailableWorker(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); + askResponse = AskResponse.succeed(sortedAvailableWorker); + } + }else { + askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); + } + getSender().tell(askResponse, getSelf()); + } + // 不需要加锁,从 Spring IOC 中重复取并没什么问题 private InstanceManager getInstanceManager() { if (instanceManager == null) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java index e6660eac..1de5f06e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java @@ -61,17 +61,6 @@ public class ServerController { return ResultDTO.success(server); } - @GetMapping("/acquireWorkerList") - public ResultDTO> acquireWorkerList(Long appId, Long jobId) { - Optional jobInfoOpt = jobInfoRepository.findById(jobId); - JobInfoDO jobInfo = jobInfoOpt.orElseThrow(() -> new PowerJobException("can't find jobInfo by id: " + jobId)); - if (!jobInfo.getAppId().equals(appId)) { - throw new PowerJobException("Permission denied!"); - } - List sortedAvailableWorker = WorkerManagerService.getSortedAvailableWorker(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); - return ResultDTO.success(sortedAvailableWorker); - } - @GetMapping("/hello") public ResultDTO ping(@RequestParam(required = false) boolean debug) { JSONObject res = new JSONObject();