diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/Handler.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/Handler.java index 4a4a220e..e5d1972d 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/Handler.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/Handler.java @@ -18,4 +18,10 @@ public @interface Handler { * @return handler path */ String path(); + + /** + * 处理类型 + * @return 阻塞 or 非阻塞 + */ + ProcessType processType() default ProcessType.BLOCKING; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/HandlerInfo.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/HandlerInfo.java index f9e860f3..dd502d9e 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/HandlerInfo.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/HandlerInfo.java @@ -26,8 +26,9 @@ public class HandlerInfo { * handler 对应的方法 */ private Method method; + /** - * actor 对象 + * Handler 注解携带的信息 */ - private transient ActorInfo actorInfo; + private Handler anno; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ProcessType.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ProcessType.java new file mode 100644 index 00000000..ef3d7623 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ProcessType.java @@ -0,0 +1,20 @@ +package tech.powerjob.remote.framework.actor; + +/** + * 处理器类型 + * + * @author tjq + * @since 2023/1/1 + */ +public enum ProcessType { + + /** + * 阻塞式 + */ + BLOCKING, + /** + * 非阻塞式 + */ + NO_BLOCKING + +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java index 6f9cc5ba..5aba1c13 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java @@ -15,7 +15,6 @@ import java.io.Serializable; */ @Getter @Setter -@ToString @Accessors(chain = true) public class HandlerLocation implements Serializable { /** @@ -30,4 +29,9 @@ public class HandlerLocation implements Serializable { public String toPath() { return String.format("/%s/%s", rootPath, methodPath); } + + @Override + public String toString() { + return toPath(); + } } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java index 658484c0..b9c84902 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java @@ -66,7 +66,7 @@ class ActorFactory { HandlerInfo handlerInfo = new HandlerInfo() - .setActorInfo(actorInfo) + .setAnno(handlerMethodAnnotation) .setMethod(handlerMethod) .setLocation(handlerLocation); ret.add(handlerInfo); diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/utils/RemoteUtils.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/utils/RemoteUtils.java new file mode 100644 index 00000000..d7fa159a --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/utils/RemoteUtils.java @@ -0,0 +1,35 @@ +package tech.powerjob.remote.framework.utils; + +import org.apache.commons.lang3.ArrayUtils; +import tech.powerjob.common.PowerSerializable; + +import java.util.Optional; + +/** + * RemoteUtils + * + * @author tjq + * @since 2023/1/1 + */ +public class RemoteUtils { + + public static Optional> findPowerSerialize(Class[] parameterTypes) { + + if (ArrayUtils.isEmpty(parameterTypes)) { + return Optional.empty(); + } + + for (Class clz : parameterTypes) { + final Class[] interfaces = clz.getInterfaces(); + if (ArrayUtils.isEmpty(interfaces)) { + continue; + } + + if (PowerSerializable.class.isAssignableFrom(clz)) { + return Optional.of(clz); + } + } + return Optional.empty(); + } + +} diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/utils/RemoteUtilsTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/utils/RemoteUtilsTest.java new file mode 100644 index 00000000..01b5c9c4 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/utils/RemoteUtilsTest.java @@ -0,0 +1,35 @@ +package tech.powerjob.remote.framework.utils; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.request.ServerScheduleJobReq; + +import java.util.Optional; + + +/** + * RemoteUtilsTest + * + * @author tjq + * @since 2023/1/1 + */ +@Slf4j +class RemoteUtilsTest { + + @Test + void findPowerSerialize() { + + Class[] contains = {AlarmConfig.class, ServerScheduleJobReq.class}; + Class[] notContains = {AlarmConfig.class}; + + final Optional> notContainsResult = RemoteUtils.findPowerSerialize(notContains); + log.info("[RemoteUtilsTest] notContainsResult: {}", notContainsResult); + final Optional> containsResult = RemoteUtils.findPowerSerialize(contains); + log.info("[RemoteUtilsTest] containsResult: {}", containsResult); + + assert !notContainsResult.isPresent(); + assert containsResult.isPresent(); + + } +} \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java deleted file mode 100644 index 1185121e..00000000 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java +++ /dev/null @@ -1,57 +0,0 @@ -package tech.powerjob.remote.http; - -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpServer; -import tech.powerjob.remote.framework.actor.ActorInfo; -import tech.powerjob.remote.framework.cs.CSInitializer; -import tech.powerjob.remote.framework.cs.CSInitializerConfig; -import tech.powerjob.remote.framework.transporter.Transporter; -import tech.powerjob.remote.http.vertx.VertxInitializer; -import tech.powerjob.remote.http.vertx.VertxTransporter; - -import java.io.IOException; -import java.util.List; - -/** - * HttpCSInitializer - * - * @author tjq - * @since 2022/12/31 - */ -public class HttpCSInitializer implements CSInitializer { - - private Vertx vertx; - private HttpServer httpServer; - private HttpClient httpClient; - - @Override - public String type() { - return tech.powerjob.common.enums.Protocol.HTTP.name(); - } - - @Override - public void init(CSInitializerConfig config) { - vertx = VertxInitializer.buildVertx(); - httpServer = VertxInitializer.buildHttpServer(vertx); - httpClient = VertxInitializer.buildHttpClient(vertx); - } - - @Override - public Transporter buildTransporter() { - return new VertxTransporter(httpClient); - } - - @Override - public void bindHandlers(List actorInfos) { - - } - - @Override - public void close() throws IOException { - vertx.close(); - httpServer.close(); - httpClient.close(); - } -} diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java new file mode 100644 index 00000000..a3f0b1f7 --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java @@ -0,0 +1,149 @@ +package tech.powerjob.remote.http; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpServer; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.remote.framework.actor.ActorInfo; +import tech.powerjob.remote.framework.actor.HandlerInfo; +import tech.powerjob.remote.framework.actor.ProcessType; +import tech.powerjob.remote.framework.cs.CSInitializer; +import tech.powerjob.remote.framework.cs.CSInitializerConfig; +import tech.powerjob.remote.framework.transporter.Transporter; +import tech.powerjob.remote.framework.utils.RemoteUtils; +import tech.powerjob.remote.http.vertx.VertxInitializer; +import tech.powerjob.remote.http.vertx.VertxTransporter; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * HttpCSInitializer + * + * @author tjq + * @since 2022/12/31 + */ +@Slf4j +public class HttpVertxCSInitializer implements CSInitializer { + + private Vertx vertx; + private HttpServer httpServer; + private HttpClient httpClient; + + private CSInitializerConfig config; + + @Override + public String type() { + return tech.powerjob.common.enums.Protocol.HTTP.name(); + } + + @Override + public void init(CSInitializerConfig config) { + this.config = config; + vertx = VertxInitializer.buildVertx(); + httpServer = VertxInitializer.buildHttpServer(vertx); + httpClient = VertxInitializer.buildHttpClient(vertx); + } + + @Override + public Transporter buildTransporter() { + return new VertxTransporter(httpClient); + } + + @Override + public void bindHandlers(List actorInfos) { + Router router = Router.router(vertx); + // 处理请求响应 + router.route().handler(BodyHandler.create()); + actorInfos.forEach(actorInfo -> { + log.info("[HttpVertxCSInitializer] start to bind Actor[{}]'s handler!", actorInfo.getAnno().path()); + Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> { + Method method = handlerInfo.getMethod(); + String handlerHttpPath = handlerInfo.getLocation().toPath(); + ProcessType processType = handlerInfo.getAnno().processType(); + log.info("[HttpVertxCSInitializer] register Handler with[path={},methodName={},processType={}]", handlerHttpPath, method.getName(), processType); + + Handler routingContextHandler = buildRequestHandler(actorInfo, handlerInfo); + Route route = router.post(handlerHttpPath); + if (processType == ProcessType.BLOCKING) { + route.blockingHandler(routingContextHandler, false); + } else { + route.handler(routingContextHandler); + } + }); + }); + + // 启动 vertx http server + final int port = config.getBindAddress().getPort(); + final String host = config.getBindAddress().getHost(); + httpServer.requestHandler(router) + .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e)) + .listen(port, host, asyncResult -> { + if (asyncResult.succeeded()) { + log.info("[PowerJob] startup vertx HttpServer successfully!"); + } else { + log.error("[PowerJob] startup vertx HttpServer failed!", asyncResult.cause()); + throw new PowerJobException("startup vertx HttpServer failed", asyncResult.cause()); + } + }); + + } + + private Handler buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) { + return ctx -> { + final RequestBody body = ctx.body(); + final Object convertResult = convertResult(body, handlerInfo); + try { + Object response = handlerInfo.getMethod().invoke(actorInfo.getActor(), convertResult); + if (response != null) { + if (response instanceof String) { + ctx.end((String) response); + } else { + ctx.end(JsonObject.mapFrom(response).toBuffer()); + } + } + + ctx.end(); + } catch (Throwable t) { + // 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式 + log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t); + ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t); + } + }; + } + + private static Object convertResult(RequestBody body, HandlerInfo handlerInfo) { + final Method method = handlerInfo.getMethod(); + + Optional> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes()); + // 内部框架,严格模式,绑定失败直接报错 + if (!powerSerializeClz.isPresent()) { + throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation()); + } + + return body.asPojo(powerSerializeClz.get()); + } + + + + @Override + public void close() throws IOException { + vertx.close(); + httpServer.close(); + httpClient.close(); + } +} diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java deleted file mode 100644 index 227c3a1b..00000000 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java +++ /dev/null @@ -1,46 +0,0 @@ -package tech.powerjob.remote.http.vertx; - -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServer; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.handler.BodyHandler; - -import java.util.Map; - -/** - * description - * - * @author tjq - * @since 2023/1/1 - */ -public class Test { - - public static void main(String[] args) { - final Vertx vertx = Vertx.vertx(); - final HttpServer httpServer = vertx.createHttpServer(); - - final Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); - router.post("/test/abc").handler(ctx -> { - - final Map data = ctx.data(); - System.out.println("ctx.data: " + data); - final String body = ctx.body().asString(); - System.out.println("request: " + body); - JsonObject jsonObject = new JsonObject(); - jsonObject.put("aa", "vv"); - - -// ctx.end(jsonObject.toBuffer()); - ctx.fail(404); - ctx.end("failedFromServer"); - }); - - httpServer - .requestHandler(router) - .exceptionHandler(e -> e.printStackTrace()) - .listen(7890); - System.out.println("aa"); - } -}