From 57501075de26e93bc13319b94dc2d431f7b27961 Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 8 Feb 2021 20:03:35 +0800 Subject: [PATCH] feat: add WorkerRequestHttpHandler powered by vert.x --- .../kfcfans/powerjob/common/OmsConstant.java | 3 + powerjob-server/pom.xml | 7 ++ .../powerjob/server/OhMyApplication.java | 9 ++- .../common/PowerJobServerConfigKey.java | 6 +- .../redirect/DesignateServerAspect.java | 8 +-- .../inner/FriendRequestHandler.java} | 10 +-- .../FriendQueryWorkerClusterStatusReq.java | 2 +- .../akka => handler/inner}/requests/Ping.java | 2 +- .../inner}/requests/RemoteProcessReq.java | 2 +- .../outer/WorkerRequestAkkaHandler.java | 68 +++++++++++++++++++ .../outer/WorkerRequestHandler.java} | 64 ++++++++--------- .../outer/WorkerRequestHttpHandler.java | 68 +++++++++++++++++++ .../server/service/ContainerService.java | 6 +- .../server/service/DispatchService.java | 2 +- .../service/ha/ClusterStatusHolder.java | 4 ++ .../service/ha/ServerSelectService.java | 12 ++-- .../service/ha/WorkerManagerService.java | 14 ++-- .../service/instance/InstanceService.java | 57 ++++++++-------- .../timing/InstanceStatusCheckService.java | 4 +- .../timing/schedule/OmsScheduleService.java | 4 +- .../server/transport/TransportService.java | 15 +++- .../server/transport/Transporter.java | 5 +- .../transport/akka/AkkaTransporter.java | 33 --------- .../actors/ServerTroubleshootingActor.java | 25 ------- .../transport/impl/AkkaTransporter.java | 48 +++++++++++++ .../AkkaStarter.java} | 33 ++++----- .../transport/starter/VertXStarter.java | 47 +++++++++++++ .../web/controller/ContainerController.java | 4 +- .../web/controller/ServerController.java | 4 +- .../web/controller/SystemInfoController.java | 7 +- .../src/main/resources/application.properties | 1 + 31 files changed, 392 insertions(+), 182 deletions(-) rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/{transport/akka/actors/FriendActor.java => handler/inner/FriendRequestHandler.java} (89%) rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/{transport/akka => handler/inner}/requests/FriendQueryWorkerClusterStatusReq.java (84%) rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/{transport/akka => handler/inner}/requests/Ping.java (78%) rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/{transport/akka => handler/inner}/requests/RemoteProcessReq.java (86%) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/{transport/akka/actors/ServerActor.java => handler/outer/WorkerRequestHandler.java} (67%) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/AkkaTransporter.java delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerTroubleshootingActor.java create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/AkkaTransporter.java rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/{akka/OhMyServer.java => starter/AkkaStarter.java} (77%) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/VertXStarter.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java index a2a79fd4..79308013 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java @@ -8,6 +8,9 @@ package com.github.kfcfans.powerjob.common; */ public class OmsConstant { + public static final int SERVER_DEFAULT_AKKA_PORT = 10086; + public static final int SERVER_DEFAULT_HTTP_PORT = 10010; + public static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; public static final String TIME_PATTERN_PLUS = "yyyy-MM-dd HH:mm:ss.SSS"; diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index e4e844dc..e0cdb822 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -31,6 +31,7 @@ 3.6 1.2.68 1.0.1 + 4.0.2 true @@ -184,6 +185,12 @@ + + io.vertx + vertx-web + ${vertx-web.version} + + diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java index cb01ddaf..9b944bf7 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java @@ -1,6 +1,8 @@ package com.github.kfcfans.powerjob.server; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; +import com.github.kfcfans.powerjob.server.transport.starter.VertXStarter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -28,8 +30,8 @@ public class OhMyApplication { pre(); - // Init ActorSystem first - OhMyServer.init(); + AkkaStarter.init(); + VertXStarter.init(); // Start SpringBoot application. try { @@ -42,6 +44,7 @@ public class OhMyApplication { private static void pre() { log.info(TIPS); + PropertyUtils.init(); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java index 5097a152..adec56e4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java @@ -9,9 +9,13 @@ package com.github.kfcfans.powerjob.server.common; public class PowerJobServerConfigKey { /** - * akka 端口号 + * akka 协议端口号 */ public static final String AKKA_PORT = "oms.akka.port"; + /** + * http 协议端口号 + */ + public static final String HTTP_PORT = "oms.http.port"; /** * 自定义数据库表前缀 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java index a50edc7e..5202862d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java @@ -5,8 +5,8 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.AskResponse; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; -import com.github.kfcfans.powerjob.server.transport.akka.requests.RemoteProcessReq; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; +import com.github.kfcfans.powerjob.server.handler.inner.requests.RemoteProcessReq; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import lombok.extern.slf4j.Slf4j; @@ -70,7 +70,7 @@ public class DesignateServerAspect { String targetServer = appInfo.getCurrentServer(); // 目标IP与本地符合则本地执行 - if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) { + if (Objects.equals(targetServer, AkkaStarter.getActorSystemAddress())) { return point.proceed(); } @@ -82,7 +82,7 @@ public class DesignateServerAspect { .setParameterTypes(parameterTypes) .setArgs(args); - CompletionStage askCS = Patterns.ask(OhMyServer.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + CompletionStage askCS = Patterns.ask(AkkaStarter.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); if (!askResponse.isSuccess()) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/FriendActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/FriendRequestHandler.java similarity index 89% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/FriendActor.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/FriendRequestHandler.java index 831e5390..ea4ba19d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/FriendActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/FriendRequestHandler.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.transport.akka.actors; +package com.github.kfcfans.powerjob.server.handler.inner; import akka.actor.AbstractActor; import com.alibaba.fastjson.JSONObject; @@ -6,9 +6,9 @@ import com.github.kfcfans.powerjob.common.model.WorkerInfo; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; -import com.github.kfcfans.powerjob.server.transport.akka.requests.FriendQueryWorkerClusterStatusReq; -import com.github.kfcfans.powerjob.server.transport.akka.requests.Ping; -import com.github.kfcfans.powerjob.server.transport.akka.requests.RemoteProcessReq; +import com.github.kfcfans.powerjob.server.handler.inner.requests.FriendQueryWorkerClusterStatusReq; +import com.github.kfcfans.powerjob.server.handler.inner.requests.Ping; +import com.github.kfcfans.powerjob.server.handler.inner.requests.RemoteProcessReq; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.util.ReflectionUtils; @@ -23,7 +23,7 @@ import java.util.Map; * @since 2020/4/9 */ @Slf4j -public class FriendActor extends AbstractActor { +public class FriendRequestHandler extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/FriendQueryWorkerClusterStatusReq.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/FriendQueryWorkerClusterStatusReq.java similarity index 84% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/FriendQueryWorkerClusterStatusReq.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/FriendQueryWorkerClusterStatusReq.java index c489d69f..af772858 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/FriendQueryWorkerClusterStatusReq.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/FriendQueryWorkerClusterStatusReq.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.transport.akka.requests; +package com.github.kfcfans.powerjob.server.handler.inner.requests; import com.github.kfcfans.powerjob.common.OmsSerializable; import lombok.AllArgsConstructor; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/Ping.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/Ping.java similarity index 78% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/Ping.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/Ping.java index 9ea2e122..62010c7a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/Ping.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/Ping.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.transport.akka.requests; +package com.github.kfcfans.powerjob.server.handler.inner.requests; import com.github.kfcfans.powerjob.common.OmsSerializable; import lombok.Data; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/RemoteProcessReq.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/RemoteProcessReq.java similarity index 86% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/RemoteProcessReq.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/RemoteProcessReq.java index f9bc8218..ccd264be 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/requests/RemoteProcessReq.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/inner/requests/RemoteProcessReq.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.transport.akka.requests; +package com.github.kfcfans.powerjob.server.handler.inner.requests; import com.github.kfcfans.powerjob.common.OmsSerializable; import lombok.Getter; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java new file mode 100644 index 00000000..c2ae96f8 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java @@ -0,0 +1,68 @@ +package com.github.kfcfans.powerjob.server.handler.outer; + +import akka.actor.AbstractActor; +import com.github.kfcfans.powerjob.common.request.*; +import com.github.kfcfans.powerjob.common.response.AskResponse; +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; + +import static com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHandler.getWorkerRequestHandler; + +/** + * 处理 Worker 请求 + * + * @author tjq + * @since 2020/3/30 + */ +@Slf4j +public class WorkerRequestAkkaHandler extends AbstractActor { + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(WorkerHeartbeat.class, hb -> getWorkerRequestHandler().onReceiveWorkerHeartbeat(hb)) + .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) + .match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req)) + .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) + .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) + .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) + .build(); + } + + + + /** + * 处理 instance 状态 + * @param req 任务实例的状态上报请求 + */ + private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { + + try { + Optional askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + if (askResponseOpt.isPresent()) { + getSender().tell(AskResponse.succeed(null), getSelf()); + } + }catch (Exception e) { + log.error("[ServerActor] update instance status failed for request: {}.", req, e); + } + } + + /** + * 处理 Worker容器部署请求 + * @param req 容器部署请求 + */ + private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { + getSender().tell(getWorkerRequestHandler().onReceiveWorkerNeedDeployContainerRequest(req), getSelf()); + } + + /** + * 处理 worker 请求获取当前任务所有处理器节点的请求 + * @param req jobId + appId + */ + private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { + + getSender().tell(getWorkerRequestHandler().onReceiveWorkerQueryExecutorClusterReq(req), getSelf()); + } + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java similarity index 67% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerActor.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java index f17c1b54..b6a350e5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java @@ -1,6 +1,5 @@ -package com.github.kfcfans.powerjob.server.transport.akka.actors; +package com.github.kfcfans.powerjob.server.handler.outer; -import akka.actor.AbstractActor; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.model.WorkerInfo; import com.github.kfcfans.powerjob.common.request.*; @@ -19,40 +18,39 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; /** - * 处理 Worker 请求 + * receive and process worker's request * * @author tjq - * @since 2020/3/30 + * @since 2021/2/8 */ @Slf4j -public class ServerActor extends AbstractActor { +@Component +public class WorkerRequestHandler { + @Resource + private Environment environment; + @Resource private InstanceManager instanceManager; + @Resource + private InstanceLogService instanceLogService; + @Resource + private ContainerInfoRepository containerInfoRepository; - @Override - public Receive createReceive() { - return receiveBuilder() - .match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat) - .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) - .match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq) - .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) - .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) - .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) - .build(); - } - + private static WorkerRequestHandler workerRequestHandler; /** * 处理 Worker 的心跳请求 * @param heartbeat 心跳包 */ - private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) { + public void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) { WorkerManagerService.updateStatus(heartbeat); } @@ -60,36 +58,35 @@ public class ServerActor extends AbstractActor { * 处理 instance 状态 * @param req 任务实例的状态上报请求 */ - private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { + public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { try { - getInstanceManager().updateStatus(req); + instanceManager.updateStatus(req); // 结束状态(成功/失败)需要回复消息 if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { - getSender().tell(AskResponse.succeed(null), getSelf()); + return Optional.of(AskResponse.succeed(null)); } }catch (Exception e) { log.error("[ServerActor] update instance status failed for request: {}.", req, e); } + return Optional.empty(); } /** * 处理OMS在线日志请求 * @param req 日志请求 */ - private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) { + public void onReceiveWorkerLogReportReq(WorkerLogReportReq req) { // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... - SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); + instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); } /** * 处理 Worker容器部署请求 * @param req 容器部署请求 */ - private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { + public AskResponse onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { - ContainerInfoRepository containerInfoRepository = SpringUtils.getBean(ContainerInfoRepository.class); - Environment environment = SpringUtils.getBean(Environment.class); String port = environment.getProperty("local.server.port"); Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); @@ -109,14 +106,14 @@ public class ServerActor extends AbstractActor { askResponse.setData(JsonUtils.toBytes(dpReq)); } - getSender().tell(askResponse, getSelf()); + return askResponse; } /** * 处理 worker 请求获取当前任务所有处理器节点的请求 * @param req jobId + appId */ - private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { + public AskResponse onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { AskResponse askResponse; @@ -137,14 +134,13 @@ public class ServerActor extends AbstractActor { }else { askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); } - getSender().tell(askResponse, getSelf()); + return askResponse; } - // 不需要加锁,从 Spring IOC 中重复取并没什么问题 - private InstanceManager getInstanceManager() { - if (instanceManager == null) { - instanceManager = SpringUtils.getBean(InstanceManager.class); + public static WorkerRequestHandler getWorkerRequestHandler() { + if (workerRequestHandler == null) { + workerRequestHandler = SpringUtils.getBean(WorkerRequestHandler.class); } - return instanceManager; + return workerRequestHandler; } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java new file mode 100644 index 00000000..e8511dc5 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java @@ -0,0 +1,68 @@ +package com.github.kfcfans.powerjob.server.handler.outer; + +import com.github.kfcfans.powerjob.common.OmsConstant; +import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; +import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; +import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; +import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; + +import java.util.Optional; +import java.util.Properties; + +import static com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHandler.getWorkerRequestHandler; + +/** + * WorkerRequestHandler + * + * @author tjq + * @since 2021/2/8 + */ +public class WorkerRequestHttpHandler extends AbstractVerticle { + + private static final String HTTP_PREFIX = "/wrh/v1/"; + + @Override + public void start() throws Exception { + + Properties properties = PropertyUtils.getProperties(); + int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT))); + + HttpServerOptions options = new HttpServerOptions(); + HttpServer server = vertx.createHttpServer(options); + + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + router.post(HTTP_PREFIX + "heartbeat") + .handler(ctx -> { + WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class); + getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat); + }); + router.post(HTTP_PREFIX + "instanceStatusReport") + .blockingHandler(ctx -> { + TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class); + Optional askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + askResponseOpt.ifPresent(askResponse -> out(ctx, askResponse)); + }); + router.post(HTTP_PREFIX + "logReport") + .blockingHandler(ctx -> { + WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class); + getWorkerRequestHandler().onReceiveWorkerLogReportReq(req); + }); + server.requestHandler(router).listen(port); + } + + private static void out(RoutingContext ctx, Object msg) { + ctx.response() + .putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE) + .end(JsonObject.mapFrom(msg).encode()); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java index b628ae31..7ba8cbd3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java @@ -10,7 +10,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; @@ -126,7 +126,7 @@ public class ContainerService { ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId()); WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> { - ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress); + ActorSelection workerActor = AkkaStarter.getWorkerActor(akkaAddress); workerActor.tell(destroyRequest, null); }); @@ -260,7 +260,7 @@ public class ContainerService { AtomicInteger count = new AtomicInteger(); workerAddressList.forEach(akkaAddress -> { - ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress); + ActorSelection workerActor = AkkaStarter.getWorkerActor(akkaAddress); workerActor.tell(req, null); remote.sendText("SYSTEM: send deploy request to " + akkaAddress); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 55b55b10..8c24b544 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -165,7 +165,7 @@ public class DispatchService { WorkerInfo taskTracker = allAvailableWorker.get(0); String taskTrackerAddress = taskTracker.getAddress(); - transportService.transfer(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); + transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req); // 修改状态 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java index 7d1a33c2..0a448f74 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java @@ -85,6 +85,10 @@ public class ClusterStatusHolder { return workers; } + public WorkerInfo getWorkerInfo(String address) { + return address2WorkerInfo.get(address); + } + public List getAvailableWorkers(double minCPUCores, double minMemorySpace, double minDiskSpace) { List workerInfos = Lists.newArrayList(); address2WorkerInfo.forEach((address, workerInfo) -> { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java index f12e7be2..996c45bc 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java @@ -4,8 +4,8 @@ import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.AskResponse; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; -import com.github.kfcfans.powerjob.server.transport.akka.requests.Ping; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; +import com.github.kfcfans.powerjob.server.handler.inner.requests.Ping; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.extension.LockService; @@ -50,7 +50,7 @@ public class ServerSelectService { public String getServer(Long appId, String currentServer) { if (!accurate()) { // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 - if (OhMyServer.getActorSystemAddress().equals(currentServer)) { + if (AkkaStarter.getActorSystemAddress().equals(currentServer)) { return currentServer; } } @@ -93,7 +93,7 @@ public class ServerSelectService { } // 篡位,本机作为Server - appInfo.setCurrentServer(OhMyServer.getActorSystemAddress()); + appInfo.setCurrentServer(AkkaStarter.getActorSystemAddress()); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); @@ -123,14 +123,14 @@ public class ServerSelectService { return false; } - if (OhMyServer.getActorSystemAddress().equals(serverAddress)) { + if (AkkaStarter.getActorSystemAddress().equals(serverAddress)) { return true; } Ping ping = new Ping(); ping.setCurrentTime(System.currentTimeMillis()); - ActorSelection serverActor = OhMyServer.getFriendActor(serverAddress); + ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress); try { CompletionStage askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java index 80582570..5a2877a5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java @@ -7,10 +7,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * Worker 管理服务 @@ -56,6 +53,15 @@ public class WorkerManagerService { return clusterStatusHolder.getAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace); } + public static Optional getWorkerInfo(Long appId, String address) { + ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); + if (clusterStatusHolder == null) { + log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); + return Optional.empty(); + } + return Optional.ofNullable(clusterStatusHolder.getWorkerInfo(address)); + } + /** * 清理不需要的worker信息 * @param usingAppIds 需要维护的appId,其余的数据将被删除 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index a8eb8e68..660f3b6f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -1,14 +1,12 @@ package com.github.kfcfans.powerjob.server.service.instance; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.InstanceDetail; +import com.github.kfcfans.powerjob.common.model.WorkerInfo; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.constans.InstanceType; import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer; import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; @@ -18,17 +16,17 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.DispatchService; +import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; +import com.github.kfcfans.powerjob.server.transport.TransportService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.time.Duration; import java.util.Date; import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; +import java.util.Optional; import java.util.stream.Collectors; import static com.github.kfcfans.powerjob.common.InstanceStatus.RUNNING; @@ -44,6 +42,8 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.STOPPED; @Service public class InstanceService { + @Resource + private TransportService transportService; @Resource private DispatchService dispatchService; @Resource @@ -115,11 +115,15 @@ public class InstanceService { 不可靠通知停止 TaskTracker 假如没有成功关闭,之后 TaskTracker 会再次 reportStatus,按照流程,instanceLog 会被更新为 RUNNING,开发者可以再次手动关闭 */ - ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceInfo.getTaskTrackerAddress()); - ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); - taskTrackerActor.tell(req, null); - - log.info("[Instance-{}] update instanceInfo and send request succeed.", instanceId); + Optional workerInfoOpt = WorkerManagerService.getWorkerInfo(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress()); + if (workerInfoOpt.isPresent()) { + ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); + WorkerInfo workerInfo = workerInfoOpt.get(); + transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req); + log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId); + } else { + log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId); + } }catch (IllegalArgumentException ie) { throw ie; @@ -248,24 +252,23 @@ public class InstanceService { return detail; } - // 运行状态下,交由 TaskTracker 返回相关信息 - try { + Optional workerInfoOpt = WorkerManagerService.getWorkerInfo(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress()); + if (workerInfoOpt.isPresent()) { + WorkerInfo workerInfo = workerInfoOpt.get(); ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId); - ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceInfoDO.getTaskTrackerAddress()); - CompletionStage askCS = Patterns.ask(taskTrackerActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - if (askResponse.isSuccess()) { - InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class); - instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes()); - instanceDetail.setInstanceParams(instanceInfoDO.getInstanceParams()); - return instanceDetail; - }else { - log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, the message is {}.", instanceId, askResponse.getMessage()); + try { + AskResponse askResponse = transportService.ask(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req); + if (askResponse.isSuccess()) { + InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class); + instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes()); + instanceDetail.setInstanceParams(instanceInfoDO.getInstanceParams()); + return instanceDetail; + }else { + log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, the message is {}.", instanceId, askResponse.getMessage()); + } + } catch (Exception e) { + log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString()); } - - }catch (Exception e) { - log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString()); } // 失败则返回基础版信息 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index ad4cd735..785d0cff 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -5,7 +5,7 @@ import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.persistence.core.model.*; import com.github.kfcfans.powerjob.server.persistence.core.repository.*; import com.github.kfcfans.powerjob.server.service.DispatchService; @@ -65,7 +65,7 @@ public class InstanceStatusCheckService { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询DB获取该Server需要负责的AppGroup - List appInfoList = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress()); + List appInfoList = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress()); if (CollectionUtils.isEmpty(appInfoList)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 63bff026..ef5b431a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.timing.schedule; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.TimeExpressionType; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; @@ -76,7 +76,7 @@ public class OmsScheduleService { Stopwatch stopwatch = Stopwatch.createStarted(); // 先查询DB,查看本机需要负责的任务 - List allAppInfos = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress()); + List allAppInfos = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress()); if (CollectionUtils.isEmpty(allAppInfos)) { log.info("[JobScheduleService] current server has no app's job to schedule."); return; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java index 40743486..b3f9c926 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java @@ -2,11 +2,13 @@ package com.github.kfcfans.powerjob.server.transport; import com.github.kfcfans.powerjob.common.OmsSerializable; import com.github.kfcfans.powerjob.common.Protocol; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -30,12 +32,21 @@ public class TransportService { }); } - public void transfer(Protocol protocol, String address, OmsSerializable object) { + public void tell(Protocol protocol, String address, OmsSerializable object) { Transporter transporter = protocol2Transporter.get(protocol); if (transporter == null) { log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol); return; } - transporter.transfer(address, object); + transporter.tell(address, object); + } + + public AskResponse ask(Protocol protocol, String address, OmsSerializable object) throws Exception { + Transporter transporter = protocol2Transporter.get(protocol); + if (transporter == null) { + log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol); + throw new IOException("can't find transporter by protocol: " + protocol); + } + return transporter.ask(address, object); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/Transporter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/Transporter.java index a1d920a6..4dedc78c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/Transporter.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/Transporter.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.transport; import com.github.kfcfans.powerjob.common.OmsSerializable; import com.github.kfcfans.powerjob.common.Protocol; +import com.github.kfcfans.powerjob.common.response.AskResponse; /** * Transporter @@ -15,5 +16,7 @@ public interface Transporter { String getAddress(); - void transfer(String address, OmsSerializable object); + void tell(String address, OmsSerializable object); + + AskResponse ask(String address, OmsSerializable object) throws Exception; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/AkkaTransporter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/AkkaTransporter.java deleted file mode 100644 index 96af264d..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/AkkaTransporter.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.github.kfcfans.powerjob.server.transport.akka; - -import akka.actor.ActorSelection; -import com.github.kfcfans.powerjob.common.OmsSerializable; -import com.github.kfcfans.powerjob.common.Protocol; -import com.github.kfcfans.powerjob.server.transport.Transporter; -import org.springframework.stereotype.Service; - -/** - * akka transporter - * - * @author tjq - * @since 2021/2/7 - */ -@Service -public class AkkaTransporter implements Transporter { - - @Override - public Protocol getProtocol() { - return Protocol.AKKA; - } - - @Override - public String getAddress() { - return OhMyServer.getActorSystemAddress(); - } - - @Override - public void transfer(String address, OmsSerializable object) { - ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(address); - taskTrackerActor.tell(object, null); - } -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerTroubleshootingActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerTroubleshootingActor.java deleted file mode 100644 index 79adc257..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/actors/ServerTroubleshootingActor.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.kfcfans.powerjob.server.transport.akka.actors; - -import akka.actor.AbstractActor; -import akka.actor.DeadLetter; -import lombok.extern.slf4j.Slf4j; - -/** - * 处理 server 异常信息的 actor - * - * @author tjq - * @since 2020/7/18 - */ -@Slf4j -public class ServerTroubleshootingActor extends AbstractActor { - @Override - public Receive createReceive() { - return receiveBuilder() - .match(DeadLetter.class, this::onReceiveDeadLetter) - .build(); - } - - public void onReceiveDeadLetter(DeadLetter dl) { - log.warn("[ServerTroubleshootingActor] receive DeadLetter: {}", dl); - } -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/AkkaTransporter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/AkkaTransporter.java new file mode 100644 index 00000000..71f8ec39 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/AkkaTransporter.java @@ -0,0 +1,48 @@ +package com.github.kfcfans.powerjob.server.transport.impl; + +import akka.actor.ActorSelection; +import akka.pattern.Patterns; +import com.github.kfcfans.powerjob.common.OmsSerializable; +import com.github.kfcfans.powerjob.common.Protocol; +import com.github.kfcfans.powerjob.common.RemoteConstant; +import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.server.transport.Transporter; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +/** + * akka transporter + * + * @author tjq + * @since 2021/2/7 + */ +@Service +public class AkkaTransporter implements Transporter { + + @Override + public Protocol getProtocol() { + return Protocol.AKKA; + } + + @Override + public String getAddress() { + return AkkaStarter.getActorSystemAddress(); + } + + @Override + public void tell(String address, OmsSerializable object) { + ActorSelection taskTrackerActor = AkkaStarter.getTaskTrackerActor(address); + taskTrackerActor.tell(object, null); + } + + @Override + public AskResponse ask(String address, OmsSerializable object) throws Exception { + ActorSelection taskTrackerActor = AkkaStarter.getTaskTrackerActor(address); + CompletionStage askCS = Patterns.ask(taskTrackerActor, object, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + return (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/AkkaStarter.java similarity index 77% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/OhMyServer.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/AkkaStarter.java index 2af7afea..d8a39177 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/AkkaStarter.java @@ -1,17 +1,19 @@ -package com.github.kfcfans.powerjob.server.transport.akka; +package com.github.kfcfans.powerjob.server.transport.starter; -import akka.actor.*; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.pattern.Patterns; import akka.routing.RoundRobinPool; +import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.NetUtils; -import com.github.kfcfans.powerjob.server.transport.akka.actors.FriendActor; -import com.github.kfcfans.powerjob.server.transport.akka.actors.ServerActor; -import com.github.kfcfans.powerjob.server.transport.akka.actors.ServerTroubleshootingActor; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; +import com.github.kfcfans.powerjob.server.handler.inner.FriendRequestHandler; +import com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestAkkaHandler; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -32,7 +34,7 @@ import java.util.concurrent.CompletionStage; * @since 2020/4/2 */ @Slf4j -public class OhMyServer { +public class AkkaStarter { public static ActorSystem actorSystem; @Getter @@ -43,18 +45,17 @@ public class OhMyServer { public static void init() { Stopwatch stopwatch = Stopwatch.createStarted(); - log.info("[OhMyServer] OhMyServer's akka system start to bootstrap..."); + log.info("[PowerJob] PowerJob's akka system start to bootstrap..."); // 忽略了一个问题,机器是没办法访问外网的,除非架设自己的NTP服务器 // TimeUtils.check(); // 解析配置文件 - PropertyUtils.init(); Properties properties = PropertyUtils.getProperties(); - int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, "10086")); + int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT))); String portFromJVM = System.getProperty(PowerJobServerConfigKey.AKKA_PORT); if (StringUtils.isNotEmpty(portFromJVM)) { - log.info("[OhMyWorker] use port from jvm params: {}", portFromJVM); + log.info("[PowerJob] use port from jvm params: {}", portFromJVM); port = Integer.parseInt(portFromJVM); } @@ -64,22 +65,18 @@ public class OhMyServer { overrideConfig.put("akka.remote.artery.canonical.hostname", localIP); overrideConfig.put("akka.remote.artery.canonical.port", port); actorSystemAddress = localIP + ":" + port; - log.info("[OhMyWorker] akka-remote server address: {}", actorSystemAddress); + log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress); Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(ServerActor.class) + actorSystem.actorOf(Props.create(WorkerRequestAkkaHandler.class) .withDispatcher("akka.server-actor-dispatcher") .withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME); - actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); + actorSystem.actorOf(Props.create(FriendRequestHandler.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); - // 处理系统中产生的异常情况 - ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(ServerTroubleshootingActor.class), RemoteConstant.SERVER_TROUBLESHOOTING_ACTOR_NAME); - actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); - - log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch); + log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch); } /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/VertXStarter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/VertXStarter.java new file mode 100644 index 00000000..9cd6e9d3 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/starter/VertXStarter.java @@ -0,0 +1,47 @@ +package com.github.kfcfans.powerjob.server.transport.starter; + +import com.github.kfcfans.powerjob.common.OmsConstant; +import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; +import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; +import com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHttpHandler; +import com.google.common.base.Stopwatch; +import io.vertx.core.Vertx; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.Properties; + +/** + * vert.x starter + * + * @author tjq + * @since 2021/2/8 + */ +@Slf4j +public class VertXStarter { + + @Getter + private static String address; + + public static void init() { + Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("[PowerJob] PowerJob's vert.x system start to bootstrap..."); + + Properties properties = PropertyUtils.getProperties(); + int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT))); + String portFromJVM = System.getProperty(PowerJobServerConfigKey.HTTP_PORT); + if (StringUtils.isNotEmpty(portFromJVM)) { + port = Integer.parseInt(portFromJVM); + } + String localIP = NetUtils.getLocalHost(); + + address = localIP + ":" + port; + log.info("[PowerJob] vert.x server address: {}", address); + + Vertx.vertx().deployVerticle(new WorkerRequestHttpHandler()); + + log.info("[PowerJob] PowerJob's vert.x system started successfully, using time {}.", stopwatch); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java index 18f7e444..c6b24d92 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java @@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.ContainerTemplateGenerator; @@ -102,7 +102,7 @@ public class ContainerController { } // 转发 HTTP 请求 - if (!OhMyServer.getActorSystemAddress().equals(targetServer)) { + if (!AkkaStarter.getActorSystemAddress().equals(targetServer)) { String targetIp = targetServer.split(":")[0]; String url = String.format("http://%s:%d/container/listDeployedWorker?appId=%d&containerId=%d", targetIp, port, appId, containerId); try { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java index 4ffe4b60..742c68d6 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java @@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService; @@ -51,7 +51,7 @@ public class ServerController { public ResultDTO ping(@RequestParam(required = false) boolean debug) { JSONObject res = new JSONObject(); res.put("localHost", NetUtils.getLocalHost()); - res.put("actorSystemAddress", OhMyServer.getActorSystemAddress()); + res.put("actorSystemAddress", AkkaStarter.getActorSystemAddress()); res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis())); res.put("serverTimeZone", TimeZone.getDefault().getDisplayName()); res.put("appIds", WorkerManagerService.getAppId2ClusterStatus().keySet()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java index 0f47bb49..d2fcb1b4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java @@ -5,13 +5,12 @@ import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.RemoteConstant; -import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.model.WorkerInfo; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.JsonUtils; -import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer; -import com.github.kfcfans.powerjob.server.transport.akka.requests.FriendQueryWorkerClusterStatusReq; +import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; +import com.github.kfcfans.powerjob.server.handler.inner.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; @@ -69,7 +68,7 @@ public class SystemInfoController { // 重定向到指定 Server 获取集群信息 FriendQueryWorkerClusterStatusReq req = new FriendQueryWorkerClusterStatusReq(appId); try { - ActorSelection friendActor = OhMyServer.getFriendActor(server); + ActorSelection friendActor = AkkaStarter.getFriendActor(server); CompletionStage askCS = Patterns.ask(friendActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties index 002de73c..da2ab869 100644 --- a/powerjob-server/src/main/resources/application.properties +++ b/powerjob-server/src/main/resources/application.properties @@ -16,5 +16,6 @@ spring.servlet.multipart.max-request-size=209715200 ###### PowerJob self-owned configuration (The following properties should exist in application.properties only). ###### # Akka ActorSystem port. oms.akka.port=10086 +oms.http.port=10010 # Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_ oms.table-prefix= \ No newline at end of file