mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: refactor worker request handler and add monitor
This commit is contained in:
parent
48ac446014
commit
614349370a
@ -0,0 +1,161 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.core.env.Environment;
|
||||
import tech.powerjob.common.request.*;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.monitor.MonitorService;
|
||||
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
|
||||
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* wrapper monitor for IWorkerRequestHandler
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
|
||||
|
||||
@Resource
|
||||
protected MonitorService monitorService;
|
||||
@Resource
|
||||
protected Environment environment;
|
||||
@Resource
|
||||
protected ContainerInfoRepository containerInfoRepository;
|
||||
@Resource
|
||||
private WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event);
|
||||
|
||||
protected abstract Optional<AskResponse> processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception;
|
||||
|
||||
protected abstract void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event);
|
||||
|
||||
|
||||
@Override
|
||||
public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
|
||||
long startMs = System.currentTimeMillis();
|
||||
WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
|
||||
.setAppName(heartbeat.getAppName())
|
||||
.setAppId(heartbeat.getAppId())
|
||||
.setVersion(heartbeat.getVersion())
|
||||
.setProtocol(heartbeat.getProtocol())
|
||||
.setTag(heartbeat.getTag())
|
||||
.setWorkerAddress(heartbeat.getWorkerAddress())
|
||||
.setDelayMs(startMs - heartbeat.getHeartbeatTime())
|
||||
.setScore(heartbeat.getSystemMetrics().getScore());
|
||||
processWorkerHeartbeat0(heartbeat, event);
|
||||
monitorService.monitor(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) {
|
||||
long startMs = System.currentTimeMillis();
|
||||
TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent()
|
||||
.setAppId(req.getAppId())
|
||||
.setJobId(req.getJobId())
|
||||
.setInstanceId(req.getInstanceId())
|
||||
.setWfInstanceId(req.getWfInstanceId())
|
||||
.setInstanceStatus(req.getInstanceStatus())
|
||||
.setDelayMs(startMs - req.getReportTime())
|
||||
.setServerProcessStatus(TtReportInstanceStatusEvent.Status.SUCCESS);
|
||||
try {
|
||||
return processTaskTrackerReportInstanceStatus0(req, event);
|
||||
} catch (Exception e) {
|
||||
event.setServerProcessStatus(TtReportInstanceStatusEvent.Status.FAILED);
|
||||
log.error("[WorkerRequestHandler] processTaskTrackerReportInstanceStatus failed for request: {}", req, e);
|
||||
return Optional.of(AskResponse.failed(ExceptionUtils.getMessage(e)));
|
||||
} finally {
|
||||
event.setServerProcessCost(System.currentTimeMillis() - startMs);
|
||||
monitorService.monitor(event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processWorkerLogReport(WorkerLogReportReq req) {
|
||||
|
||||
long startMs = System.currentTimeMillis();
|
||||
WorkerLogReportEvent event = new WorkerLogReportEvent()
|
||||
.setWorkerAddress(req.getWorkerAddress());
|
||||
try {
|
||||
processWorkerLogReport0(req, event);
|
||||
event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
|
||||
} catch (RejectedExecutionException re) {
|
||||
event.setStatus(WorkerLogReportEvent.Status.REJECTED);
|
||||
} catch (Throwable t) {
|
||||
event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
|
||||
log.warn("[WorkerRequestHandler] process worker report failed!", t);
|
||||
} finally {
|
||||
event.setServerCost(System.currentTimeMillis() - startMs);
|
||||
monitorService.monitor(event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) {
|
||||
AskResponse askResponse;
|
||||
|
||||
Long jobId = req.getJobId();
|
||||
Long appId = req.getAppId();
|
||||
|
||||
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
|
||||
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
|
||||
if (jobInfoOpt.isPresent()) {
|
||||
JobInfoDO jobInfo = jobInfoOpt.get();
|
||||
if (!jobInfo.getAppId().equals(appId)) {
|
||||
askResponse = AskResponse.failed("Permission Denied!");
|
||||
}else {
|
||||
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
|
||||
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
|
||||
askResponse = AskResponse.succeed(sortedAvailableWorker);
|
||||
}
|
||||
}else {
|
||||
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
|
||||
}
|
||||
return askResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) {
|
||||
String port = environment.getProperty("local.server.port");
|
||||
|
||||
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
|
||||
AskResponse askResponse = new AskResponse();
|
||||
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
|
||||
askResponse.setSuccess(false);
|
||||
askResponse.setMessage("can't find container by id: " + req.getContainerId());
|
||||
}else {
|
||||
ContainerInfoDO containerInfo = containerInfoOpt.get();
|
||||
askResponse.setSuccess(true);
|
||||
|
||||
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
|
||||
BeanUtils.copyProperties(containerInfo, dpReq);
|
||||
dpReq.setContainerId(containerInfo.getId());
|
||||
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion());
|
||||
dpReq.setDownloadURL(downloadURL);
|
||||
|
||||
askResponse.setData(JsonUtils.toBytes(dpReq));
|
||||
}
|
||||
return askResponse;
|
||||
}
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import tech.powerjob.common.request.*;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 定义 server 与 worker 之间需要处理的协议
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/10
|
||||
*/
|
||||
public interface IWorkerRequestHandler {
|
||||
|
||||
/**
|
||||
* 处理 worker 上报的心跳信息
|
||||
* @param heartbeat 心跳信息
|
||||
*/
|
||||
void processWorkerHeartbeat(WorkerHeartbeat heartbeat);
|
||||
|
||||
/**
|
||||
* 处理 TaskTracker 的任务实例上报
|
||||
* @param req 上报请求
|
||||
* @return 响应信息
|
||||
*/
|
||||
Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
|
||||
|
||||
/**
|
||||
* 处理 worker 查询执行器集群
|
||||
* @param req 请求
|
||||
* @return cluster info
|
||||
*/
|
||||
AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req);
|
||||
|
||||
/**
|
||||
* 处理 worker 日志推送请求
|
||||
* @param req 请求
|
||||
*/
|
||||
void processWorkerLogReport(WorkerLogReportReq req);
|
||||
|
||||
/**
|
||||
* 处理 worker 的容器部署请求
|
||||
* @param request 请求
|
||||
* @return 容器部署信息
|
||||
*/
|
||||
AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request);
|
||||
}
|
@ -1,180 +0,0 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.request.*;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler;
|
||||
import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler;
|
||||
import tech.powerjob.server.core.instance.InstanceLogService;
|
||||
import tech.powerjob.server.core.instance.InstanceManager;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
|
||||
import tech.powerjob.server.monitor.MonitorService;
|
||||
import tech.powerjob.server.monitor.events.handler.WorkerHeartbeatEvent;
|
||||
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
|
||||
import tech.powerjob.server.remote.transport.starter.VertXStarter;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* receive and process worker's request
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2021/2/8
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class WorkerRequestHandler {
|
||||
|
||||
@Resource
|
||||
private MonitorService monitorService;
|
||||
@Resource
|
||||
private Environment environment;
|
||||
@Resource
|
||||
private InstanceManager instanceManager;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
@Resource
|
||||
private InstanceLogService instanceLogService;
|
||||
@Resource
|
||||
private ContainerInfoRepository containerInfoRepository;
|
||||
|
||||
@Resource
|
||||
private WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
private static WorkerRequestHandler workerRequestHandler;
|
||||
|
||||
@PostConstruct
|
||||
public void initHandler() {
|
||||
// init akka
|
||||
AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
|
||||
// init vert.x
|
||||
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 Worker 的心跳请求
|
||||
* @param heartbeat 心跳包
|
||||
*/
|
||||
public void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
|
||||
|
||||
WorkerHeartbeatEvent event = new WorkerHeartbeatEvent();
|
||||
BeanUtils.copyProperties(heartbeat, event);
|
||||
monitorService.monitor(event.setScore(heartbeat.getSystemMetrics().getScore()));
|
||||
|
||||
WorkerClusterManagerService.updateStatus(heartbeat);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 instance 状态
|
||||
* @param req 任务实例的状态上报请求
|
||||
*/
|
||||
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
|
||||
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常
|
||||
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
|
||||
// 更新工作流上下文信息
|
||||
workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext());
|
||||
}
|
||||
|
||||
instanceManager.updateStatus(req);
|
||||
|
||||
// 结束状态(成功/失败)需要回复消息
|
||||
if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
|
||||
return Optional.of(AskResponse.succeed(null));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理OMS在线日志请求
|
||||
* @param req 日志请求
|
||||
*/
|
||||
public void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
|
||||
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
|
||||
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 Worker容器部署请求
|
||||
* @param req 容器部署请求
|
||||
*/
|
||||
public AskResponse onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
|
||||
|
||||
String port = environment.getProperty("local.server.port");
|
||||
|
||||
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
|
||||
AskResponse askResponse = new AskResponse();
|
||||
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
|
||||
askResponse.setSuccess(false);
|
||||
askResponse.setMessage("can't find container by id: " + req.getContainerId());
|
||||
}else {
|
||||
ContainerInfoDO containerInfo = containerInfoOpt.get();
|
||||
askResponse.setSuccess(true);
|
||||
|
||||
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
|
||||
BeanUtils.copyProperties(containerInfo, dpReq);
|
||||
dpReq.setContainerId(containerInfo.getId());
|
||||
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion());
|
||||
dpReq.setDownloadURL(downloadURL);
|
||||
|
||||
askResponse.setData(JsonUtils.toBytes(dpReq));
|
||||
}
|
||||
return askResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 worker 请求获取当前任务所有处理器节点的请求
|
||||
* @param req jobId + appId
|
||||
*/
|
||||
public AskResponse onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
|
||||
|
||||
AskResponse askResponse;
|
||||
|
||||
Long jobId = req.getJobId();
|
||||
Long appId = req.getAppId();
|
||||
|
||||
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
|
||||
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
|
||||
if (jobInfoOpt.isPresent()) {
|
||||
JobInfoDO jobInfo = jobInfoOpt.get();
|
||||
if (!jobInfo.getAppId().equals(appId)) {
|
||||
askResponse = AskResponse.failed("Permission Denied!");
|
||||
}else {
|
||||
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
|
||||
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
|
||||
askResponse = AskResponse.succeed(sortedAvailableWorker);
|
||||
}
|
||||
}else {
|
||||
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
|
||||
}
|
||||
return askResponse;
|
||||
}
|
||||
|
||||
public static WorkerRequestHandler getWorkerRequestHandler() {
|
||||
if (workerRequestHandler == null) {
|
||||
workerRequestHandler = SpringUtils.getBean(WorkerRequestHandler.class);
|
||||
}
|
||||
return workerRequestHandler;
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* WorkerRequestHandlerHolder
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@Component
|
||||
public class WorkerRequestHandlerHolder {
|
||||
|
||||
private static IWorkerRequestHandler workerRequestHandler;
|
||||
|
||||
|
||||
public static IWorkerRequestHandler fetchWorkerRequestHandler() {
|
||||
return workerRequestHandler;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) {
|
||||
WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler;
|
||||
}
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import tech.powerjob.common.request.WorkerHeartbeat;
|
||||
import tech.powerjob.common.request.WorkerLogReportReq;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
import tech.powerjob.server.core.instance.InstanceLogService;
|
||||
import tech.powerjob.server.core.instance.InstanceManager;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
|
||||
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* receive and process worker's request
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
|
||||
|
||||
@Resource
|
||||
private InstanceManager instanceManager;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
@Resource
|
||||
private InstanceLogService instanceLogService;
|
||||
|
||||
@Override
|
||||
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
|
||||
WorkerClusterManagerService.updateStatus(heartbeat);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Optional<AskResponse> processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception {
|
||||
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常
|
||||
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
|
||||
// 更新工作流上下文信息
|
||||
workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext());
|
||||
}
|
||||
|
||||
instanceManager.updateStatus(req);
|
||||
|
||||
// 结束状态(成功/失败)需要回复消息
|
||||
if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
|
||||
return Optional.of(AskResponse.succeed(null));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
|
||||
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
|
||||
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package tech.powerjob.server.core.handler.impl;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
|
||||
import tech.powerjob.server.remote.transport.starter.VertXStarter;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 初始化器
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@Component
|
||||
public class Initializer {
|
||||
|
||||
@PostConstruct
|
||||
public void initHandler() {
|
||||
// init akka
|
||||
AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
|
||||
// init vert.x
|
||||
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
|
||||
}
|
||||
}
|
@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler;
|
||||
import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler;
|
||||
|
||||
/**
|
||||
* 处理 Worker 请求
|
||||
@ -42,9 +42,9 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(WorkerHeartbeat.class, hb -> getWorkerRequestHandler().onReceiveWorkerHeartbeat(hb))
|
||||
.match(WorkerHeartbeat.class, hb -> fetchWorkerRequestHandler().processWorkerHeartbeat(hb))
|
||||
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
|
||||
.match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req))
|
||||
.match(WorkerLogReportReq.class, req -> fetchWorkerRequestHandler().processWorkerLogReport(req))
|
||||
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
|
||||
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
|
||||
.matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj))
|
||||
@ -71,7 +71,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
|
||||
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
|
||||
|
||||
try {
|
||||
Optional<AskResponse> askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
|
||||
Optional<AskResponse> askResponseOpt = fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
|
||||
if (askResponseOpt.isPresent()) {
|
||||
getSender().tell(AskResponse.succeed(null), getSelf());
|
||||
}
|
||||
@ -85,7 +85,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
|
||||
* @param req 容器部署请求
|
||||
*/
|
||||
private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
|
||||
getSender().tell(getWorkerRequestHandler().onReceiveWorkerNeedDeployContainerRequest(req), getSelf());
|
||||
getSender().tell(fetchWorkerRequestHandler().processWorkerNeedDeployContainer(req), getSelf());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -94,7 +94,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
|
||||
*/
|
||||
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
|
||||
|
||||
getSender().tell(getWorkerRequestHandler().onReceiveWorkerQueryExecutorClusterReq(req), getSelf());
|
||||
getSender().tell(fetchWorkerRequestHandler().processWorkerQueryExecutorCluster(req), getSelf());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler;
|
||||
import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler;
|
||||
|
||||
/**
|
||||
* WorkerRequestHandler
|
||||
@ -46,14 +46,14 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
|
||||
router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT)
|
||||
.handler(ctx -> {
|
||||
WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
|
||||
getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat);
|
||||
fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);
|
||||
success(ctx);
|
||||
});
|
||||
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
|
||||
.blockingHandler(ctx -> {
|
||||
TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
|
||||
try {
|
||||
getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
|
||||
fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
|
||||
out(ctx, AskResponse.succeed(null));
|
||||
} catch (Exception e) {
|
||||
log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);
|
||||
@ -63,7 +63,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
|
||||
router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
|
||||
.blockingHandler(ctx -> {
|
||||
WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
|
||||
getWorkerRequestHandler().onReceiveWorkerLogReportReq(req);
|
||||
fetchWorkerRequestHandler().processWorkerLogReport(req);
|
||||
success(ctx);
|
||||
});
|
||||
server.requestHandler(router).listen(port);
|
||||
|
@ -1,26 +0,0 @@
|
||||
package tech.powerjob.server.monitor.events.handler;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import tech.powerjob.server.monitor.Event;
|
||||
|
||||
/**
|
||||
* TaskTrackerReportInstanceStatus
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/9
|
||||
*/
|
||||
@Setter
|
||||
@Accessors(chain = true)
|
||||
public class TtReportInstanceStatusEvent implements Event {
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return "MONITOR_LOGGER_TT_REPORT_STATUS";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String message() {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
package tech.powerjob.server.monitor.events.w2s;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import tech.powerjob.server.common.SJ;
|
||||
import tech.powerjob.server.monitor.Event;
|
||||
|
||||
/**
|
||||
* TaskTrackerReportInstanceStatus
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/9
|
||||
*/
|
||||
@Setter
|
||||
@Accessors(chain = true)
|
||||
public class TtReportInstanceStatusEvent implements Event {
|
||||
|
||||
private Long appId;
|
||||
private Long jobId;
|
||||
private Long instanceId;
|
||||
|
||||
private Long wfInstanceId;
|
||||
|
||||
private int instanceStatus;
|
||||
|
||||
private Long delayMs;
|
||||
|
||||
private Status serverProcessStatus;
|
||||
|
||||
private Long serverProcessCost;
|
||||
|
||||
public enum Status {
|
||||
SUCCESS,
|
||||
FAILED
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return "MONITOR_LOGGER_TT_REPORT_STATUS";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String message() {
|
||||
return SJ.MONITOR_JOINER.join(appId, jobId, instanceId, wfInstanceId, instanceStatus, delayMs, serverProcessStatus, serverProcessCost);
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.server.monitor.events.handler;
|
||||
package tech.powerjob.server.monitor.events.w2s;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
@ -26,7 +26,10 @@ public class WorkerHeartbeatEvent implements Event {
|
||||
|
||||
private String tag;
|
||||
private String workerAddress;
|
||||
|
||||
/**
|
||||
* worker 上报时间与 server 之间的延迟
|
||||
*/
|
||||
private long delayMs;
|
||||
private Integer score;
|
||||
|
||||
@Override
|
||||
@ -36,6 +39,6 @@ public class WorkerHeartbeatEvent implements Event {
|
||||
|
||||
@Override
|
||||
public String message() {
|
||||
return SJ.MONITOR_JOINER.join(appName, appId, version, protocol, tag, workerAddress, score);
|
||||
return SJ.MONITOR_JOINER.join(appName, appId, version, protocol, tag, workerAddress, delayMs, score);
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package tech.powerjob.server.monitor.events.w2s;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import tech.powerjob.server.common.SJ;
|
||||
import tech.powerjob.server.monitor.Event;
|
||||
|
||||
/**
|
||||
* description
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@Setter
|
||||
@Accessors(chain = true)
|
||||
public class WorkerLogReportEvent implements Event {
|
||||
|
||||
private String workerAddress;
|
||||
|
||||
private long logSize;
|
||||
|
||||
private Status status;
|
||||
|
||||
private long serverCost;
|
||||
|
||||
public enum Status {
|
||||
SUCCESS,
|
||||
REJECTED,
|
||||
EXCEPTION
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return "MONITOR_LOGGER_WORKER_LOG_REPORT";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String message() {
|
||||
return SJ.MONITOR_JOINER.join(workerAddress, logSize, status, serverCost);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user