From 87a1a1d7c1b79f2cc052b80548f7cc76626e51a8 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 1 Jan 2023 20:12:00 +0800 Subject: [PATCH] feat: vertx http framwork --- .../framework/base/HandlerLocation.java | 4 + .../framework/base/RemotingException.java | 6 +- .../remote/http/HttpCSInitializer.java | 22 ++++- .../powerjob/remote/http/HttpProtocol.java | 2 +- .../tech/powerjob/remote/http/vertx/Test.java | 46 +++++++++ .../remote/http/vertx/VertxInitializer.java | 51 ++++++++++ .../remote/http/vertx/VertxTransporter.java | 99 +++++++++++++++++++ 7 files changed, 223 insertions(+), 7 deletions(-) create mode 100644 powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java create mode 100644 powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxInitializer.java create mode 100644 powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java index 8a2ad028..6f9cc5ba 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java @@ -26,4 +26,8 @@ public class HandlerLocation implements Serializable { * 方法路径 */ private String methodPath; + + public String toPath() { + return String.format("/%s/%s", rootPath, methodPath); + } } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/RemotingException.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/RemotingException.java index 24de209b..fd3185db 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/RemotingException.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/RemotingException.java @@ -8,7 +8,9 @@ import java.io.IOException; * @author tjq * @since 2022/12/31 */ -public class RemotingException extends IOException { - +public class RemotingException extends RuntimeException { + public RemotingException(String message) { + super(message); + } } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java index 3d28f9aa..1185121e 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpCSInitializer.java @@ -1,9 +1,15 @@ package tech.powerjob.remote.http; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpServer; import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.cs.CSInitializer; import tech.powerjob.remote.framework.cs.CSInitializerConfig; import tech.powerjob.remote.framework.transporter.Transporter; +import tech.powerjob.remote.http.vertx.VertxInitializer; +import tech.powerjob.remote.http.vertx.VertxTransporter; import java.io.IOException; import java.util.List; @@ -16,19 +22,25 @@ import java.util.List; */ public class HttpCSInitializer implements CSInitializer { + private Vertx vertx; + private HttpServer httpServer; + private HttpClient httpClient; + @Override public String type() { - return null; + return tech.powerjob.common.enums.Protocol.HTTP.name(); } @Override public void init(CSInitializerConfig config) { - + vertx = VertxInitializer.buildVertx(); + httpServer = VertxInitializer.buildHttpServer(vertx); + httpClient = VertxInitializer.buildHttpClient(vertx); } @Override public Transporter buildTransporter() { - return null; + return new VertxTransporter(httpClient); } @Override @@ -38,6 +50,8 @@ public class HttpCSInitializer implements CSInitializer { @Override public void close() throws IOException { - + vertx.close(); + httpServer.close(); + httpClient.close(); } } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpProtocol.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpProtocol.java index 0b15a8a8..2e874652 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpProtocol.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpProtocol.java @@ -12,6 +12,6 @@ public class HttpProtocol implements Protocol { @Override public String name() { - return null; + return tech.powerjob.common.enums.Protocol.HTTP.name(); } } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java new file mode 100644 index 00000000..227c3a1b --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/Test.java @@ -0,0 +1,46 @@ +package tech.powerjob.remote.http.vertx; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; + +import java.util.Map; + +/** + * description + * + * @author tjq + * @since 2023/1/1 + */ +public class Test { + + public static void main(String[] args) { + final Vertx vertx = Vertx.vertx(); + final HttpServer httpServer = vertx.createHttpServer(); + + final Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + router.post("/test/abc").handler(ctx -> { + + final Map data = ctx.data(); + System.out.println("ctx.data: " + data); + final String body = ctx.body().asString(); + System.out.println("request: " + body); + JsonObject jsonObject = new JsonObject(); + jsonObject.put("aa", "vv"); + + +// ctx.end(jsonObject.toBuffer()); + ctx.fail(404); + ctx.end("failedFromServer"); + }); + + httpServer + .requestHandler(router) + .exceptionHandler(e -> e.printStackTrace()) + .listen(7890); + System.out.println("aa"); + } +} diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxInitializer.java new file mode 100644 index 00000000..c7aa9dfd --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxInitializer.java @@ -0,0 +1,51 @@ +package tech.powerjob.remote.http.vertx; + +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import lombok.extern.slf4j.Slf4j; + +/** + * VertxInitializer + * PowerJob 只是将 vertx 作为 toolkit 使用 + * + * @author tjq + * @since 2023/1/1 + */ +@Slf4j +public class VertxInitializer { + + public static Vertx buildVertx() { + VertxOptions options = new VertxOptions(); + log.info("[PowerJob-Vertx] use vertx options: {}", options); + return Vertx.vertx(options); + } + + public static HttpServer buildHttpServer(Vertx vertx) { + HttpServerOptions httpServerOptions = new HttpServerOptions(); + tryEnableCompression(httpServerOptions); + log.info("[PowerJob-Vertx] use HttpServerOptions: {}", httpServerOptions); + return vertx.createHttpServer(httpServerOptions); + } + private static void tryEnableCompression(HttpServerOptions httpServerOptions) { + // 非核心组件,不直接依赖类(无 import),加载报错可忽略 + try { + httpServerOptions + .addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.gzip()) + .setCompressionSupported(true); + log.warn("[PowerJob-Vertx] enable server side compression successfully!"); + } catch (Exception e) { + log.warn("[PowerJob-Vertx] enable server side compression failed!", e); + } + } + + public static HttpClient buildHttpClient(Vertx vertx) { + HttpClientOptions httpClientOptions = new HttpClientOptions(); + log.info("[PowerJob-Vertx] use HttpClientOptions: {}", httpClientOptions); + return vertx.createHttpClient(httpClientOptions); + } + +} diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java new file mode 100644 index 00000000..eaadd5ab --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java @@ -0,0 +1,99 @@ +package tech.powerjob.remote.http.vertx; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.http.*; +import io.vertx.core.json.JsonObject; +import lombok.SneakyThrows; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.request.ServerScheduleJobReq; +import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.HandlerLocation; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.transporter.Protocol; +import tech.powerjob.remote.framework.transporter.Transporter; +import tech.powerjob.remote.http.HttpProtocol; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; + +/** + * VertxTransporter + * + * @author tjq + * @since 2023/1/1 + */ +public class VertxTransporter implements Transporter { + + private final HttpClient httpClient; + + private static final Protocol PROTOCOL = new HttpProtocol(); + + public VertxTransporter(HttpClient httpClient) { + this.httpClient = httpClient; + } + + @Override + public Protocol getProtocol() { + return PROTOCOL; + } + + @Override + public void tell(URL url, PowerSerializable request) { + post(url, request); + } + + @Override + public CompletionStage ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException { + return post(url, request); + } + + private CompletionStage post(URL url, PowerSerializable request) { + final String host = url.getAddress().getHost(); + final int port = url.getAddress().getPort(); + final String path = url.getLocation().toPath(); + RequestOptions requestOptions = new RequestOptions() + .setMethod(HttpMethod.POST) + .setHost(host) + .setPort(port) + .setURI(path); + // 获取远程服务器的HTTP连接 + Future httpClientRequestFuture = httpClient.request(requestOptions); + // 转换 -> 发送请求获取响应 + Future responseFuture = httpClientRequestFuture.compose(httpClientRequest -> httpClientRequest.send(JsonObject.mapFrom(request).toBuffer())); + return responseFuture.compose(httpClientResponse -> { + // throw exception + final int statusCode = httpClientResponse.statusCode(); + if (statusCode != HttpResponseStatus.OK.code()) { + // CompletableFuture.get() 时会传递抛出该异常 + throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s", + host, port, path, statusCode, httpClientResponse.statusMessage() + )); + } + return httpClientResponse.body().compose(x -> { + // TODO: 类型转换 + return Future.succeededFuture(x.toJson()); + }); + }).toCompletionStage(); + } + + @SneakyThrows + public static void main(String[] args) { + final Vertx vertx = Vertx.vertx(); + final HttpClient hc = Vertx.vertx().createHttpClient(); + VertxTransporter transport = new VertxTransporter(hc); + + ServerScheduleJobReq serverScheduleJobReq = new ServerScheduleJobReq(); + serverScheduleJobReq.setJobId(1234L); + serverScheduleJobReq.setJobParams("asdasdas"); + + URL url = new URL(); + url.setAddress(new Address().setHost("127.0.0.1").setPort(7890)); + url.setLocation(new HandlerLocation().setRootPath("test").setMethodPath("abc")); + + final CompletionStage post = transport.post(url, serverScheduleJobReq); + System.out.println(post.toCompletableFuture().get()); + } +}