refactor: worker cluster query change to akka from htpp

This commit is contained in:
Salieri 2020-10-17 15:38:54 +08:00
parent c7812708c3
commit 2a6bb0b7f3
3 changed files with 54 additions and 16 deletions

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* worker 查询 执行器集群动态上线需要
*
* @author tjq
* @since 10/17/20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkerQueryExecutorClusterReq implements OmsSerializable {
private Long appId;
private Long jobId;
}

View File

@ -2,18 +2,18 @@ package com.github.kfcfans.powerjob.server.akka.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.*;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import java.util.List;
import java.util.Optional; import java.util.Optional;
/** /**
@ -41,6 +42,7 @@ public class ServerActor extends AbstractActor {
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq) .match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq)
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build(); .build();
} }
@ -110,6 +112,33 @@ public class ServerActor extends AbstractActor {
getSender().tell(askResponse, getSelf()); getSender().tell(askResponse, getSelf());
} }
/**
* 处理 worker 请求获取当前任务所有处理器节点的请求
* @param req jobId + appId
*/
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
Long jobId = req.getJobId();
Long appId = req.getAppId();
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
if (jobInfoOpt.isPresent()) {
JobInfoDO jobInfo = jobInfoOpt.get();
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = WorkerManagerService.getSortedAvailableWorker(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}
}else {
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
}
getSender().tell(askResponse, getSelf());
}
// 不需要加锁 Spring IOC 中重复取并没什么问题 // 不需要加锁 Spring IOC 中重复取并没什么问题
private InstanceManager getInstanceManager() { private InstanceManager getInstanceManager() {
if (instanceManager == null) { if (instanceManager == null) {

View File

@ -61,17 +61,6 @@ public class ServerController {
return ResultDTO.success(server); return ResultDTO.success(server);
} }
@GetMapping("/acquireWorkerList")
public ResultDTO<List<String>> acquireWorkerList(Long appId, Long jobId) {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
JobInfoDO jobInfo = jobInfoOpt.orElseThrow(() -> new PowerJobException("can't find jobInfo by id: " + jobId));
if (!jobInfo.getAppId().equals(appId)) {
throw new PowerJobException("Permission denied!");
}
List<String> sortedAvailableWorker = WorkerManagerService.getSortedAvailableWorker(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
return ResultDTO.success(sortedAvailableWorker);
}
@GetMapping("/hello") @GetMapping("/hello")
public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) { public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
JSONObject res = new JSONObject(); JSONObject res = new JSONObject();