From 614349370abdbb28903f608e4943ebcdb67d897a Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 11 Sep 2022 17:14:00 +0800 Subject: [PATCH] feat: refactor worker request handler and add monitor --- .../core/handler/AbWorkerRequestHandler.java | 161 ++++++++++++++++ .../core/handler/IWorkerRequestHandler.java | 48 +++++ .../core/handler/WorkerRequestHandler.java | 180 ------------------ .../handler/WorkerRequestHandlerHolder.java | 28 +++ .../handler/WorkerRequestHandlerImpl.java | 67 +++++++ .../server/core/handler/impl/Initializer.java | 26 +++ .../impl/WorkerRequestAkkaHandler.java | 12 +- .../impl/WorkerRequestHttpHandler.java | 8 +- .../handler/TtReportInstanceStatusEvent.java | 26 --- .../w2s/TtReportInstanceStatusEvent.java | 46 +++++ .../WorkerHeartbeatEvent.java | 9 +- .../events/w2s/WorkerLogReportEvent.java | 42 ++++ 12 files changed, 434 insertions(+), 219 deletions(-) create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java delete mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java create mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java delete mode 100644 powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/TtReportInstanceStatusEvent.java create mode 100644 powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java rename powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/{handler => w2s}/WorkerHeartbeatEvent.java (81%) create mode 100644 powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java new file mode 100644 index 00000000..81b61758 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java @@ -0,0 +1,161 @@ +package tech.powerjob.server.core.handler; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.core.env.Environment; +import tech.powerjob.common.request.*; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.common.utils.SpringUtils; +import tech.powerjob.server.monitor.MonitorService; +import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; +import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.worker.WorkerClusterQueryService; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.RejectedExecutionException; +import java.util.stream.Collectors; + +/** + * wrapper monitor for IWorkerRequestHandler + * + * @author tjq + * @since 2022/9/11 + */ +@Slf4j +public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { + + @Resource + protected MonitorService monitorService; + @Resource + protected Environment environment; + @Resource + protected ContainerInfoRepository containerInfoRepository; + @Resource + private WorkerClusterQueryService workerClusterQueryService; + + protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event); + + protected abstract Optional processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception; + + protected abstract void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event); + + + @Override + public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) { + long startMs = System.currentTimeMillis(); + WorkerHeartbeatEvent event = new WorkerHeartbeatEvent() + .setAppName(heartbeat.getAppName()) + .setAppId(heartbeat.getAppId()) + .setVersion(heartbeat.getVersion()) + .setProtocol(heartbeat.getProtocol()) + .setTag(heartbeat.getTag()) + .setWorkerAddress(heartbeat.getWorkerAddress()) + .setDelayMs(startMs - heartbeat.getHeartbeatTime()) + .setScore(heartbeat.getSystemMetrics().getScore()); + processWorkerHeartbeat0(heartbeat, event); + monitorService.monitor(event); + } + + @Override + public Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) { + long startMs = System.currentTimeMillis(); + TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent() + .setAppId(req.getAppId()) + .setJobId(req.getJobId()) + .setInstanceId(req.getInstanceId()) + .setWfInstanceId(req.getWfInstanceId()) + .setInstanceStatus(req.getInstanceStatus()) + .setDelayMs(startMs - req.getReportTime()) + .setServerProcessStatus(TtReportInstanceStatusEvent.Status.SUCCESS); + try { + return processTaskTrackerReportInstanceStatus0(req, event); + } catch (Exception e) { + event.setServerProcessStatus(TtReportInstanceStatusEvent.Status.FAILED); + log.error("[WorkerRequestHandler] processTaskTrackerReportInstanceStatus failed for request: {}", req, e); + return Optional.of(AskResponse.failed(ExceptionUtils.getMessage(e))); + } finally { + event.setServerProcessCost(System.currentTimeMillis() - startMs); + monitorService.monitor(event); + } + } + + @Override + public void processWorkerLogReport(WorkerLogReportReq req) { + + long startMs = System.currentTimeMillis(); + WorkerLogReportEvent event = new WorkerLogReportEvent() + .setWorkerAddress(req.getWorkerAddress()); + try { + processWorkerLogReport0(req, event); + event.setStatus(WorkerLogReportEvent.Status.SUCCESS); + } catch (RejectedExecutionException re) { + event.setStatus(WorkerLogReportEvent.Status.REJECTED); + } catch (Throwable t) { + event.setStatus(WorkerLogReportEvent.Status.EXCEPTION); + log.warn("[WorkerRequestHandler] process worker report failed!", t); + } finally { + event.setServerCost(System.currentTimeMillis() - startMs); + monitorService.monitor(event); + } + } + + @Override + public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) { + AskResponse askResponse; + + Long jobId = req.getJobId(); + Long appId = req.getAppId(); + + JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + if (jobInfoOpt.isPresent()) { + JobInfoDO jobInfo = jobInfoOpt.get(); + if (!jobInfo.getAppId().equals(appId)) { + askResponse = AskResponse.failed("Permission Denied!"); + }else { + List sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo) + .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); + askResponse = AskResponse.succeed(sortedAvailableWorker); + } + }else { + askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); + } + return askResponse; + } + + @Override + public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) { + String port = environment.getProperty("local.server.port"); + + Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); + AskResponse askResponse = new AskResponse(); + if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) { + askResponse.setSuccess(false); + askResponse.setMessage("can't find container by id: " + req.getContainerId()); + }else { + ContainerInfoDO containerInfo = containerInfoOpt.get(); + askResponse.setSuccess(true); + + ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest(); + BeanUtils.copyProperties(containerInfo, dpReq); + dpReq.setContainerId(containerInfo.getId()); + String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion()); + dpReq.setDownloadURL(downloadURL); + + askResponse.setData(JsonUtils.toBytes(dpReq)); + } + return askResponse; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java new file mode 100644 index 00000000..6fe30e70 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java @@ -0,0 +1,48 @@ +package tech.powerjob.server.core.handler; + +import tech.powerjob.common.request.*; +import tech.powerjob.common.response.AskResponse; + +import java.util.Optional; + +/** + * 定义 server 与 worker 之间需要处理的协议 + * + * @author tjq + * @since 2022/9/10 + */ +public interface IWorkerRequestHandler { + + /** + * 处理 worker 上报的心跳信息 + * @param heartbeat 心跳信息 + */ + void processWorkerHeartbeat(WorkerHeartbeat heartbeat); + + /** + * 处理 TaskTracker 的任务实例上报 + * @param req 上报请求 + * @return 响应信息 + */ + Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req); + + /** + * 处理 worker 查询执行器集群 + * @param req 请求 + * @return cluster info + */ + AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req); + + /** + * 处理 worker 日志推送请求 + * @param req 请求 + */ + void processWorkerLogReport(WorkerLogReportReq req); + + /** + * 处理 worker 的容器部署请求 + * @param request 请求 + * @return 容器部署信息 + */ + AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request); +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java deleted file mode 100644 index 47be383e..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java +++ /dev/null @@ -1,180 +0,0 @@ -package tech.powerjob.server.core.handler; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.core.env.Environment; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.request.*; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.module.WorkerInfo; -import tech.powerjob.server.common.utils.SpringUtils; -import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler; -import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler; -import tech.powerjob.server.core.instance.InstanceLogService; -import tech.powerjob.server.core.instance.InstanceManager; -import tech.powerjob.server.core.workflow.WorkflowInstanceManager; -import tech.powerjob.server.monitor.MonitorService; -import tech.powerjob.server.monitor.events.handler.WorkerHeartbeatEvent; -import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.remote.transport.starter.VertXStarter; -import tech.powerjob.server.remote.worker.WorkerClusterManagerService; -import tech.powerjob.server.remote.worker.WorkerClusterQueryService; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -/** - * receive and process worker's request - * - * @author tjq - * @since 2021/2/8 - */ -@Slf4j -@Component -public class WorkerRequestHandler { - - @Resource - private MonitorService monitorService; - @Resource - private Environment environment; - @Resource - private InstanceManager instanceManager; - @Resource - private WorkflowInstanceManager workflowInstanceManager; - @Resource - private InstanceLogService instanceLogService; - @Resource - private ContainerInfoRepository containerInfoRepository; - - @Resource - private WorkerClusterQueryService workerClusterQueryService; - - private static WorkerRequestHandler workerRequestHandler; - - @PostConstruct - public void initHandler() { - // init akka - AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME); - // init vert.x - VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); - } - - /** - * 处理 Worker 的心跳请求 - * @param heartbeat 心跳包 - */ - public void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) { - - WorkerHeartbeatEvent event = new WorkerHeartbeatEvent(); - BeanUtils.copyProperties(heartbeat, event); - monitorService.monitor(event.setScore(heartbeat.getSystemMetrics().getScore())); - - WorkerClusterManagerService.updateStatus(heartbeat); - } - - /** - * 处理 instance 状态 - * @param req 任务实例的状态上报请求 - */ - public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws ExecutionException { - // 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常 - if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) { - // 更新工作流上下文信息 - workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext()); - } - - instanceManager.updateStatus(req); - - // 结束状态(成功/失败)需要回复消息 - if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) { - return Optional.of(AskResponse.succeed(null)); - } - return Optional.empty(); - } - - /** - * 处理OMS在线日志请求 - * @param req 日志请求 - */ - public void onReceiveWorkerLogReportReq(WorkerLogReportReq req) { - // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... - instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); - } - - /** - * 处理 Worker容器部署请求 - * @param req 容器部署请求 - */ - public AskResponse onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { - - String port = environment.getProperty("local.server.port"); - - Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); - AskResponse askResponse = new AskResponse(); - if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) { - askResponse.setSuccess(false); - askResponse.setMessage("can't find container by id: " + req.getContainerId()); - }else { - ContainerInfoDO containerInfo = containerInfoOpt.get(); - askResponse.setSuccess(true); - - ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest(); - BeanUtils.copyProperties(containerInfo, dpReq); - dpReq.setContainerId(containerInfo.getId()); - String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion()); - dpReq.setDownloadURL(downloadURL); - - askResponse.setData(JsonUtils.toBytes(dpReq)); - } - return askResponse; - } - - /** - * 处理 worker 请求获取当前任务所有处理器节点的请求 - * @param req jobId + appId - */ - public AskResponse onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { - - AskResponse askResponse; - - Long jobId = req.getJobId(); - Long appId = req.getAppId(); - - JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); - Optional jobInfoOpt = jobInfoRepository.findById(jobId); - if (jobInfoOpt.isPresent()) { - JobInfoDO jobInfo = jobInfoOpt.get(); - if (!jobInfo.getAppId().equals(appId)) { - askResponse = AskResponse.failed("Permission Denied!"); - }else { - List sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo) - .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); - askResponse = AskResponse.succeed(sortedAvailableWorker); - } - }else { - askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); - } - return askResponse; - } - - public static WorkerRequestHandler getWorkerRequestHandler() { - if (workerRequestHandler == null) { - workerRequestHandler = SpringUtils.getBean(WorkerRequestHandler.class); - } - return workerRequestHandler; - } -} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java new file mode 100644 index 00000000..ac259e03 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java @@ -0,0 +1,28 @@ +package tech.powerjob.server.core.handler; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * WorkerRequestHandlerHolder + * + * @author tjq + * @since 2022/9/11 + */ +@Component +public class WorkerRequestHandlerHolder { + + private static IWorkerRequestHandler workerRequestHandler; + + + public static IWorkerRequestHandler fetchWorkerRequestHandler() { + return workerRequestHandler; + } + + @Autowired + public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) { + WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java new file mode 100644 index 00000000..77e573be --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java @@ -0,0 +1,67 @@ +package tech.powerjob.server.core.handler; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.common.request.WorkerHeartbeat; +import tech.powerjob.common.request.WorkerLogReportReq; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.server.core.instance.InstanceLogService; +import tech.powerjob.server.core.instance.InstanceManager; +import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; +import tech.powerjob.server.remote.worker.WorkerClusterManagerService; + +import javax.annotation.Resource; +import java.util.Optional; + +/** + * receive and process worker's request + * + * @author tjq + * @since 2022/9/11 + */ +@Slf4j +@Component +public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler { + + @Resource + private InstanceManager instanceManager; + @Resource + private WorkflowInstanceManager workflowInstanceManager; + @Resource + private InstanceLogService instanceLogService; + + @Override + protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) { + WorkerClusterManagerService.updateStatus(heartbeat); + } + + @Override + protected Optional processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception { + // 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常 + if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) { + // 更新工作流上下文信息 + workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext()); + } + + instanceManager.updateStatus(req); + + // 结束状态(成功/失败)需要回复消息 + if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) { + return Optional.of(AskResponse.succeed(null)); + } + return Optional.empty(); + } + + @Override + protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) { + // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... + instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java new file mode 100644 index 00000000..77951c32 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java @@ -0,0 +1,26 @@ +package tech.powerjob.server.core.handler.impl; + +import org.springframework.stereotype.Component; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.server.remote.transport.starter.AkkaStarter; +import tech.powerjob.server.remote.transport.starter.VertXStarter; + +import javax.annotation.PostConstruct; + +/** + * 初始化器 + * + * @author tjq + * @since 2022/9/11 + */ +@Component +public class Initializer { + + @PostConstruct + public void initHandler() { + // init akka + AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME); + // init vert.x + VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java index 99aa0cd3..147b9df4 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java @@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.Optional; -import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler; +import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler; /** * 处理 Worker 请求 @@ -42,9 +42,9 @@ public class WorkerRequestAkkaHandler extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() - .match(WorkerHeartbeat.class, hb -> getWorkerRequestHandler().onReceiveWorkerHeartbeat(hb)) + .match(WorkerHeartbeat.class, hb -> fetchWorkerRequestHandler().processWorkerHeartbeat(hb)) .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) - .match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req)) + .match(WorkerLogReportReq.class, req -> fetchWorkerRequestHandler().processWorkerLogReport(req)) .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) .matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj)) @@ -71,7 +71,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { try { - Optional askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + Optional askResponseOpt = fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req); if (askResponseOpt.isPresent()) { getSender().tell(AskResponse.succeed(null), getSelf()); } @@ -85,7 +85,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { * @param req 容器部署请求 */ private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { - getSender().tell(getWorkerRequestHandler().onReceiveWorkerNeedDeployContainerRequest(req), getSelf()); + getSender().tell(fetchWorkerRequestHandler().processWorkerNeedDeployContainer(req), getSelf()); } /** @@ -94,7 +94,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { */ private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { - getSender().tell(getWorkerRequestHandler().onReceiveWorkerQueryExecutorClusterReq(req), getSelf()); + getSender().tell(fetchWorkerRequestHandler().processWorkerQueryExecutorCluster(req), getSelf()); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java index 615dab2c..3749158f 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java @@ -21,7 +21,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.Properties; -import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler; +import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler; /** * WorkerRequestHandler @@ -46,14 +46,14 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT) .handler(ctx -> { WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class); - getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat); + fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat); success(ctx); }); router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT) .blockingHandler(ctx -> { TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class); try { - getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req); out(ctx, AskResponse.succeed(null)); } catch (Exception e) { log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e); @@ -63,7 +63,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT) .blockingHandler(ctx -> { WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class); - getWorkerRequestHandler().onReceiveWorkerLogReportReq(req); + fetchWorkerRequestHandler().processWorkerLogReport(req); success(ctx); }); server.requestHandler(router).listen(port); diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/TtReportInstanceStatusEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/TtReportInstanceStatusEvent.java deleted file mode 100644 index 567762b4..00000000 --- a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/TtReportInstanceStatusEvent.java +++ /dev/null @@ -1,26 +0,0 @@ -package tech.powerjob.server.monitor.events.handler; - -import lombok.Setter; -import lombok.experimental.Accessors; -import tech.powerjob.server.monitor.Event; - -/** - * TaskTrackerReportInstanceStatus - * - * @author tjq - * @since 2022/9/9 - */ -@Setter -@Accessors(chain = true) -public class TtReportInstanceStatusEvent implements Event { - - @Override - public String type() { - return "MONITOR_LOGGER_TT_REPORT_STATUS"; - } - - @Override - public String message() { - return null; - } -} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java new file mode 100644 index 00000000..6eef0faf --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java @@ -0,0 +1,46 @@ +package tech.powerjob.server.monitor.events.w2s; + +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * TaskTrackerReportInstanceStatus + * + * @author tjq + * @since 2022/9/9 + */ +@Setter +@Accessors(chain = true) +public class TtReportInstanceStatusEvent implements Event { + + private Long appId; + private Long jobId; + private Long instanceId; + + private Long wfInstanceId; + + private int instanceStatus; + + private Long delayMs; + + private Status serverProcessStatus; + + private Long serverProcessCost; + + public enum Status { + SUCCESS, + FAILED + } + + @Override + public String type() { + return "MONITOR_LOGGER_TT_REPORT_STATUS"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(appId, jobId, instanceId, wfInstanceId, instanceStatus, delayMs, serverProcessStatus, serverProcessCost); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/WorkerHeartbeatEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerHeartbeatEvent.java similarity index 81% rename from powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/WorkerHeartbeatEvent.java rename to powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerHeartbeatEvent.java index 9dd3cff0..d2686dd8 100644 --- a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/handler/WorkerHeartbeatEvent.java +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerHeartbeatEvent.java @@ -1,4 +1,4 @@ -package tech.powerjob.server.monitor.events.handler; +package tech.powerjob.server.monitor.events.w2s; import lombok.Setter; import lombok.experimental.Accessors; @@ -26,7 +26,10 @@ public class WorkerHeartbeatEvent implements Event { private String tag; private String workerAddress; - + /** + * worker 上报时间与 server 之间的延迟 + */ + private long delayMs; private Integer score; @Override @@ -36,6 +39,6 @@ public class WorkerHeartbeatEvent implements Event { @Override public String message() { - return SJ.MONITOR_JOINER.join(appName, appId, version, protocol, tag, workerAddress, score); + return SJ.MONITOR_JOINER.join(appName, appId, version, protocol, tag, workerAddress, delayMs, score); } } diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java new file mode 100644 index 00000000..89bec99d --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java @@ -0,0 +1,42 @@ +package tech.powerjob.server.monitor.events.w2s; + +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * description + * + * @author tjq + * @since 2022/9/11 + */ +@Setter +@Accessors(chain = true) +public class WorkerLogReportEvent implements Event { + + private String workerAddress; + + private long logSize; + + private Status status; + + private long serverCost; + + public enum Status { + SUCCESS, + REJECTED, + EXCEPTION + } + + @Override + public String type() { + return "MONITOR_LOGGER_WORKER_LOG_REPORT"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(workerAddress, logSize, status, serverCost); + } +}