From e74fc2d1387ba0db25a9744b33e813011e7a7c62 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 21 Jan 2023 10:34:37 +0800 Subject: [PATCH] chore: remove akka in common package --- powerjob-common/pom.xml | 13 --- powerjob-server/pom.xml | 12 --- .../handler/WorkerRequestHandlerHolder.java | 27 ----- .../impl/WorkerRequestAkkaHandler.java | 100 ------------------ .../impl/WorkerRequestHttpHandler.java | 81 -------------- 5 files changed, 233 deletions(-) delete mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java delete mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java delete mode 100644 powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index dcd8d499..41387b46 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -19,7 +19,6 @@ 2.11.0 31.1-jre 3.14.9 - 2.6.20 5.3.0 2.14.0-rc1 5.9.0 @@ -54,18 +53,6 @@ ${okhttp.version} - - - com.typesafe.akka - akka-remote_2.13 - ${akka.version} - - - com.typesafe.akka - akka-slf4j_2.13 - ${akka.version} - - commons-io diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index dedfefba..b9cc9460 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -43,7 +43,6 @@ 3.8.0 1.2.83 1.0.1 - 4.0.2 true @@ -252,17 +251,6 @@ - - io.vertx - vertx-web - ${vertx-web.version} - - - io.vertx - vertx-web-client - ${vertx-web.version} - - com.cronutils cron-utils diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java deleted file mode 100644 index f9c267cc..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java +++ /dev/null @@ -1,27 +0,0 @@ -package tech.powerjob.server.core.handler; - -import org.springframework.stereotype.Component; - - -/** - * WorkerRequestHandlerHolder - * - * @author tjq - * @since 2022/9/11 - */ -@Component -public class WorkerRequestHandlerHolder { - - private static IWorkerRequestHandler workerRequestHandler; - - public WorkerRequestHandlerHolder(IWorkerRequestHandler injectedWorkerRequestHandler) { - workerRequestHandler = injectedWorkerRequestHandler; - } - - public static IWorkerRequestHandler fetchWorkerRequestHandler() { - if (workerRequestHandler == null){ - throw new IllegalStateException("WorkerRequestHandlerHolder not initialized!"); - } - return workerRequestHandler; - } -} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java deleted file mode 100644 index 26f18be0..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java +++ /dev/null @@ -1,100 +0,0 @@ -package tech.powerjob.server.core.handler.impl; - -import akka.actor.AbstractActor; -import akka.actor.Props; -import akka.routing.DefaultResizer; -import akka.routing.RoundRobinPool; -import tech.powerjob.common.request.*; -import tech.powerjob.common.response.AskResponse; -import lombok.extern.slf4j.Slf4j; - -import java.util.Optional; - -import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler; - -/** - * 处理 Worker 请求 - * - * @author tjq - * @since 2020/3/30 - */ -@Slf4j -public class WorkerRequestAkkaHandler extends AbstractActor { - - - public static Props defaultProps(){ - return Props.create(WorkerRequestAkkaHandler.class) - .withDispatcher("akka.w-r-c-d") - .withRouter( - new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4) - .withResizer(new DefaultResizer( - Runtime.getRuntime().availableProcessors() * 4, - Runtime.getRuntime().availableProcessors() * 10, - 1, - 0.2d, - 0.3d, - 0.1d, - 10 - )) - ); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match(WorkerHeartbeat.class, hb -> fetchWorkerRequestHandler().processWorkerHeartbeat(hb)) - .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) - .match(WorkerLogReportReq.class, req -> fetchWorkerRequestHandler().processWorkerLogReport(req)) - .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) - .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) - .matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj)) - .build(); - } - - @Override - public void preStart() throws Exception { - super.preStart(); - log.debug("[WorkerRequestAkkaHandler]init WorkerRequestActor"); - } - - - @Override - public void postStop() throws Exception { - super.postStop(); - log.debug("[WorkerRequestAkkaHandler]stop WorkerRequestActor"); - } - - /** - * 处理 instance 状态 - * @param req 任务实例的状态上报请求 - */ - private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { - - try { - Optional askResponseOpt = fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req); - if (askResponseOpt.isPresent()) { - getSender().tell(AskResponse.succeed(null), getSelf()); - } - }catch (Exception e) { - log.error("[WorkerRequestAkkaHandler] update instance status failed for request: {}.", req, e); - } - } - - /** - * 处理 Worker容器部署请求 - * @param req 容器部署请求 - */ - private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { - getSender().tell(fetchWorkerRequestHandler().processWorkerNeedDeployContainer(req), getSelf()); - } - - /** - * 处理 worker 请求获取当前任务所有处理器节点的请求 - * @param req jobId + appId - */ - private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { - - getSender().tell(fetchWorkerRequestHandler().processWorkerQueryExecutorCluster(req), getSelf()); - } - -} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java deleted file mode 100644 index 3749158f..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java +++ /dev/null @@ -1,81 +0,0 @@ -package tech.powerjob.server.core.handler.impl; - -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.ProtocolConstant; -import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; -import tech.powerjob.common.request.WorkerHeartbeat; -import tech.powerjob.common.request.WorkerLogReportReq; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.response.ResultDTO; -import tech.powerjob.server.common.PowerJobServerConfigKey; -import tech.powerjob.server.common.utils.PropertyUtils; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.http.HttpServer; -import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.handler.BodyHandler; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; - -import java.util.Properties; - -import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler; - -/** - * WorkerRequestHandler - * - * @author tjq - * @since 2021/2/8 - */ -@Slf4j -public class WorkerRequestHttpHandler extends AbstractVerticle { - - @Override - public void start() throws Exception { - - Properties properties = PropertyUtils.getProperties(); - int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT))); - - HttpServerOptions options = new HttpServerOptions(); - HttpServer server = vertx.createHttpServer(options); - - Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); - router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT) - .handler(ctx -> { - WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class); - fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat); - success(ctx); - }); - router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT) - .blockingHandler(ctx -> { - TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class); - try { - fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req); - out(ctx, AskResponse.succeed(null)); - } catch (Exception e) { - log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e); - out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e))); - } - }); - router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT) - .blockingHandler(ctx -> { - WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class); - fetchWorkerRequestHandler().processWorkerLogReport(req); - success(ctx); - }); - server.requestHandler(router).listen(port); - } - - private static void out(RoutingContext ctx, Object msg) { - ctx.response() - .putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE) - .end(JsonObject.mapFrom(msg).encode()); - } - - private static void success(RoutingContext ctx) { - out(ctx, ResultDTO.success(null)); - } -}