feat: optimize remote framework log output

This commit is contained in:
tjq 2023-01-21 23:13:28 +08:00
parent bfb9c68590
commit 4fece7be40
7 changed files with 35 additions and 29 deletions

View File

@ -2,18 +2,15 @@ package tech.powerjob.remote.framework.engine.impl;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.reflections.ReflectionUtils;
import org.reflections.Reflections;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.actor.*;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.base.HandlerLocation;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Set;
/**
* load all Actor
@ -53,7 +50,12 @@ class ActorFactory {
String rootPath = anno.path();
Object actor = actorInfo.getActor();
Method[] declaredMethods = actor.getClass().getDeclaredMethods();
findHandlerMethod(rootPath, actor.getClass(), ret);
return ret;
}
private static void findHandlerMethod(String rootPath, Class<?> clz, List<HandlerInfo> result) {
Method[] declaredMethods = clz.getDeclaredMethods();
for (Method handlerMethod: declaredMethods) {
Handler handlerMethodAnnotation = handlerMethod.getAnnotation(Handler.class);
if (handlerMethodAnnotation == null) {
@ -68,9 +70,14 @@ class ActorFactory {
.setAnno(handlerMethodAnnotation)
.setMethod(handlerMethod)
.setLocation(handlerLocation);
ret.add(handlerInfo);
result.add(handlerInfo);
}
// 递归处理父类
final Class<?> superclass = clz.getSuperclass();
if (superclass != null) {
findHandlerMethod(rootPath, superclass, result);
}
return ret;
}
static String suitPath(String path) {

View File

@ -27,16 +27,17 @@ public class PowerJobRemoteEngine implements RemoteEngine {
@Override
public EngineOutput start(EngineConfig engineConfig) {
final String engineType = engineConfig.getType();
EngineOutput engineOutput = new EngineOutput();
log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig);
log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);
List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
csInitializer = CSInitializerFactory.build(engineConfig.getType());
csInitializer = CSInitializerFactory.build(engineType);
String type = csInitializer.type();
Stopwatch sw = Stopwatch.createStarted();
log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type);
log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);
csInitializer.init(new CSInitializerConfig()
.setBindAddress(engineConfig.getBindAddress())
@ -47,10 +48,13 @@ public class PowerJobRemoteEngine implements RemoteEngine {
Transporter transporter = csInitializer.buildTransporter();
engineOutput.setTransporter(transporter);
log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));
// 绑定 handler
csInitializer.bindHandlers(actorInfos);
log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw);
log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);
return engineOutput;
}

View File

@ -41,7 +41,6 @@ public class AkkaProxyActor extends AbstractActor {
}
final Class<?> bindClz = powerSerializeClz.get();
receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo));
log.info("[PowerJob-AKKA] bind handler[{}] to [{}]", location, bindClz);
});
this.receive = receiveBuilder.build();
}

View File

@ -75,12 +75,9 @@ public class HttpVertxCSInitializer implements CSInitializer {
// 处理请求响应
router.route().handler(BodyHandler.create());
actorInfos.forEach(actorInfo -> {
log.info("[PowerJob-Vertx] start to bind Actor[{}]'s handler!", actorInfo.getAnno().path());
Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
Method method = handlerInfo.getMethod();
String handlerHttpPath = handlerInfo.getLocation().toPath();
ProcessType processType = handlerInfo.getAnno().processType();
log.info("[PowerJob-Vertx] start to register Handler with[path={},methodName={},processType={}]", handlerHttpPath, method.getName(), processType);
Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
Route route = router.post(handlerHttpPath);
@ -89,7 +86,6 @@ public class HttpVertxCSInitializer implements CSInitializer {
} else {
route.handler(routingContextHandler);
}
log.info("[PowerJob-Vertx] register Handler[path={},methodName={}] successfully!", handlerHttpPath, method.getName());
});
});

View File

@ -10,6 +10,8 @@ import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.SpringUtils;
@ -28,6 +30,8 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import static tech.powerjob.common.RemoteConstant.*;
/**
* wrapper monitor for IWorkerRequestHandler
*
@ -55,6 +59,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
@Override
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
long startMs = System.currentTimeMillis();
WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
@ -71,6 +76,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
}
@Override
@Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING)
public Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) {
long startMs = System.currentTimeMillis();
TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent()
@ -94,6 +100,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
}
@Override
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
public void processWorkerLogReport(WorkerLogReportReq req) {
WorkerLogReportEvent event = new WorkerLogReportEvent()
@ -113,6 +120,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
}
@Override
@Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING)
public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
@ -137,6 +145,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
}
@Override
@Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING)
public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) {
String port = environment.getProperty("local.server.port");

View File

@ -2,13 +2,9 @@ package tech.powerjob.server.core.handler;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import java.util.Optional;
import static tech.powerjob.common.RemoteConstant.*;
/**
* 定义 server worker 之间需要处理的协议
*
@ -21,7 +17,6 @@ public interface IWorkerRequestHandler {
* 处理 worker 上报的心跳信息
* @param heartbeat 心跳信息
*/
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
void processWorkerHeartbeat(WorkerHeartbeat heartbeat);
/**
@ -29,7 +24,6 @@ public interface IWorkerRequestHandler {
* @param req 上报请求
* @return 响应信息
*/
@Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING)
Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
/**
@ -37,14 +31,12 @@ public interface IWorkerRequestHandler {
* @param req 请求
* @return cluster info
*/
@Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING)
AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req);
/**
* 处理 worker 日志推送请求内部使用线程池异步处理非阻塞
* @param req 请求
*/
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
void processWorkerLogReport(WorkerLogReportReq req);
/**
@ -52,6 +44,5 @@ public interface IWorkerRequestHandler {
* @param request 请求
* @return 容器部署信息
*/
@Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING)
AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request);
}

View File

@ -36,7 +36,7 @@ public class FriendActor {
}
@Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING)
private AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) {
public AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) {
AskResponse response = new AskResponse();
response.setSuccess(true);