From 4fece7be4081df0a5169d22966be642a8fe26163 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 21 Jan 2023 23:13:28 +0800 Subject: [PATCH] feat: optimize remote framework log output --- .../framework/engine/impl/ActorFactory.java | 27 ++++++++++++------- .../engine/impl/PowerJobRemoteEngine.java | 12 ++++++--- .../powerjob/remote/akka/AkkaProxyActor.java | 1 - .../remote/http/HttpVertxCSInitializer.java | 4 --- .../core/handler/AbWorkerRequestHandler.java | 9 +++++++ .../core/handler/IWorkerRequestHandler.java | 9 ------- .../server/remote/server/FriendActor.java | 2 +- 7 files changed, 35 insertions(+), 29 deletions(-) diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java index 21f6e430..cfd9108c 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java @@ -2,18 +2,15 @@ package tech.powerjob.remote.framework.engine.impl; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.reflections.ReflectionUtils; -import org.reflections.Reflections; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.remote.framework.actor.*; +import tech.powerjob.remote.framework.actor.Actor; +import tech.powerjob.remote.framework.actor.ActorInfo; +import tech.powerjob.remote.framework.actor.Handler; +import tech.powerjob.remote.framework.actor.HandlerInfo; import tech.powerjob.remote.framework.base.HandlerLocation; import java.lang.reflect.Method; import java.util.List; -import java.util.Set; /** * load all Actor @@ -53,7 +50,12 @@ class ActorFactory { String rootPath = anno.path(); Object actor = actorInfo.getActor(); - Method[] declaredMethods = actor.getClass().getDeclaredMethods(); + findHandlerMethod(rootPath, actor.getClass(), ret); + return ret; + } + + private static void findHandlerMethod(String rootPath, Class clz, List result) { + Method[] declaredMethods = clz.getDeclaredMethods(); for (Method handlerMethod: declaredMethods) { Handler handlerMethodAnnotation = handlerMethod.getAnnotation(Handler.class); if (handlerMethodAnnotation == null) { @@ -68,9 +70,14 @@ class ActorFactory { .setAnno(handlerMethodAnnotation) .setMethod(handlerMethod) .setLocation(handlerLocation); - ret.add(handlerInfo); + result.add(handlerInfo); + } + + // 递归处理父类 + final Class superclass = clz.getSuperclass(); + if (superclass != null) { + findHandlerMethod(rootPath, superclass, result); } - return ret; } static String suitPath(String path) { diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index dc09fdf5..2f9cbc6b 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -27,16 +27,17 @@ public class PowerJobRemoteEngine implements RemoteEngine { @Override public EngineOutput start(EngineConfig engineConfig) { + final String engineType = engineConfig.getType(); EngineOutput engineOutput = new EngineOutput(); - log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig); + log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig); List actorInfos = ActorFactory.load(engineConfig.getActorList()); - csInitializer = CSInitializerFactory.build(engineConfig.getType()); + csInitializer = CSInitializerFactory.build(engineType); String type = csInitializer.type(); Stopwatch sw = Stopwatch.createStarted(); - log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type); + log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type); csInitializer.init(new CSInitializerConfig() .setBindAddress(engineConfig.getBindAddress()) @@ -47,10 +48,13 @@ public class PowerJobRemoteEngine implements RemoteEngine { Transporter transporter = csInitializer.buildTransporter(); engineOutput.setTransporter(transporter); + log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType); + actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod()))); + // 绑定 handler csInitializer.bindHandlers(actorInfos); - log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw); + log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw); return engineOutput; } diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java index b2d2fc5e..af4f9441 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java @@ -41,7 +41,6 @@ public class AkkaProxyActor extends AbstractActor { } final Class bindClz = powerSerializeClz.get(); receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo)); - log.info("[PowerJob-AKKA] bind handler[{}] to [{}]", location, bindClz); }); this.receive = receiveBuilder.build(); } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java index dbf62bd8..3b2012f9 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java @@ -75,12 +75,9 @@ public class HttpVertxCSInitializer implements CSInitializer { // 处理请求响应 router.route().handler(BodyHandler.create()); actorInfos.forEach(actorInfo -> { - log.info("[PowerJob-Vertx] start to bind Actor[{}]'s handler!", actorInfo.getAnno().path()); Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> { - Method method = handlerInfo.getMethod(); String handlerHttpPath = handlerInfo.getLocation().toPath(); ProcessType processType = handlerInfo.getAnno().processType(); - log.info("[PowerJob-Vertx] start to register Handler with[path={},methodName={},processType={}]", handlerHttpPath, method.getName(), processType); Handler routingContextHandler = buildRequestHandler(actorInfo, handlerInfo); Route route = router.post(handlerHttpPath); @@ -89,7 +86,6 @@ public class HttpVertxCSInitializer implements CSInitializer { } else { route.handler(routingContextHandler); } - log.info("[PowerJob-Vertx] register Handler[path={},methodName={}] successfully!", handlerHttpPath, method.getName()); }); }); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java index b0e8047c..c13c4cf6 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java @@ -10,6 +10,8 @@ import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.remote.framework.actor.Handler; +import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.utils.SpringUtils; @@ -28,6 +30,8 @@ import java.util.Optional; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; +import static tech.powerjob.common.RemoteConstant.*; + /** * wrapper monitor for IWorkerRequestHandler * @@ -55,6 +59,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { @Override + @Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING) public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) { long startMs = System.currentTimeMillis(); WorkerHeartbeatEvent event = new WorkerHeartbeatEvent() @@ -71,6 +76,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { } @Override + @Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING) public Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) { long startMs = System.currentTimeMillis(); TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent() @@ -94,6 +100,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { } @Override + @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING) public void processWorkerLogReport(WorkerLogReportReq req) { WorkerLogReportEvent event = new WorkerLogReportEvent() @@ -113,6 +120,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { } @Override + @Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING) public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) { AskResponse askResponse; @@ -137,6 +145,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { } @Override + @Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING) public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) { String port = environment.getProperty("local.server.port"); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java index 9aac2484..e55c1903 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java @@ -2,13 +2,9 @@ package tech.powerjob.server.core.handler; import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; -import tech.powerjob.remote.framework.actor.Handler; -import tech.powerjob.remote.framework.actor.ProcessType; import java.util.Optional; -import static tech.powerjob.common.RemoteConstant.*; - /** * 定义 server 与 worker 之间需要处理的协议 * @@ -21,7 +17,6 @@ public interface IWorkerRequestHandler { * 处理 worker 上报的心跳信息 * @param heartbeat 心跳信息 */ - @Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING) void processWorkerHeartbeat(WorkerHeartbeat heartbeat); /** @@ -29,7 +24,6 @@ public interface IWorkerRequestHandler { * @param req 上报请求 * @return 响应信息 */ - @Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING) Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req); /** @@ -37,14 +31,12 @@ public interface IWorkerRequestHandler { * @param req 请求 * @return cluster info */ - @Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING) AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req); /** * 处理 worker 日志推送请求(内部使用线程池异步处理,非阻塞) * @param req 请求 */ - @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING) void processWorkerLogReport(WorkerLogReportReq req); /** @@ -52,6 +44,5 @@ public interface IWorkerRequestHandler { * @param request 请求 * @return 容器部署信息 */ - @Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING) AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java index e6d066cc..3174780d 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java @@ -36,7 +36,7 @@ public class FriendActor { } @Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING) - private AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) { + public AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) { AskResponse response = new AskResponse(); response.setSuccess(true);