feat: vertx http framwork

This commit is contained in:
tjq 2023-01-01 20:12:00 +08:00
parent 268f5dd5c7
commit 87a1a1d7c1
7 changed files with 223 additions and 7 deletions

View File

@ -26,4 +26,8 @@ public class HandlerLocation implements Serializable {
* 方法路径
*/
private String methodPath;
public String toPath() {
return String.format("/%s/%s", rootPath, methodPath);
}
}

View File

@ -8,7 +8,9 @@ import java.io.IOException;
* @author tjq
* @since 2022/12/31
*/
public class RemotingException extends IOException {
public class RemotingException extends RuntimeException {
public RemotingException(String message) {
super(message);
}
}

View File

@ -1,9 +1,15 @@
package tech.powerjob.remote.http;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.http.vertx.VertxInitializer;
import tech.powerjob.remote.http.vertx.VertxTransporter;
import java.io.IOException;
import java.util.List;
@ -16,19 +22,25 @@ import java.util.List;
*/
public class HttpCSInitializer implements CSInitializer {
private Vertx vertx;
private HttpServer httpServer;
private HttpClient httpClient;
@Override
public String type() {
return null;
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
@Override
public void init(CSInitializerConfig config) {
vertx = VertxInitializer.buildVertx();
httpServer = VertxInitializer.buildHttpServer(vertx);
httpClient = VertxInitializer.buildHttpClient(vertx);
}
@Override
public Transporter buildTransporter() {
return null;
return new VertxTransporter(httpClient);
}
@Override
@ -38,6 +50,8 @@ public class HttpCSInitializer implements CSInitializer {
@Override
public void close() throws IOException {
vertx.close();
httpServer.close();
httpClient.close();
}
}

View File

@ -12,6 +12,6 @@ public class HttpProtocol implements Protocol {
@Override
public String name() {
return null;
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
}

View File

@ -0,0 +1,46 @@
package tech.powerjob.remote.http.vertx;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.Map;
/**
* description
*
* @author tjq
* @since 2023/1/1
*/
public class Test {
public static void main(String[] args) {
final Vertx vertx = Vertx.vertx();
final HttpServer httpServer = vertx.createHttpServer();
final Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post("/test/abc").handler(ctx -> {
final Map<String, Object> data = ctx.data();
System.out.println("ctx.data: " + data);
final String body = ctx.body().asString();
System.out.println("request: " + body);
JsonObject jsonObject = new JsonObject();
jsonObject.put("aa", "vv");
// ctx.end(jsonObject.toBuffer());
ctx.fail(404);
ctx.end("failedFromServer");
});
httpServer
.requestHandler(router)
.exceptionHandler(e -> e.printStackTrace())
.listen(7890);
System.out.println("aa");
}
}

View File

@ -0,0 +1,51 @@
package tech.powerjob.remote.http.vertx;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import lombok.extern.slf4j.Slf4j;
/**
* VertxInitializer
* PowerJob 只是将 vertx 作为 toolkit 使用
*
* @author tjq
* @since 2023/1/1
*/
@Slf4j
public class VertxInitializer {
public static Vertx buildVertx() {
VertxOptions options = new VertxOptions();
log.info("[PowerJob-Vertx] use vertx options: {}", options);
return Vertx.vertx(options);
}
public static HttpServer buildHttpServer(Vertx vertx) {
HttpServerOptions httpServerOptions = new HttpServerOptions();
tryEnableCompression(httpServerOptions);
log.info("[PowerJob-Vertx] use HttpServerOptions: {}", httpServerOptions);
return vertx.createHttpServer(httpServerOptions);
}
private static void tryEnableCompression(HttpServerOptions httpServerOptions) {
// 非核心组件不直接依赖类 import加载报错可忽略
try {
httpServerOptions
.addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.gzip())
.setCompressionSupported(true);
log.warn("[PowerJob-Vertx] enable server side compression successfully!");
} catch (Exception e) {
log.warn("[PowerJob-Vertx] enable server side compression failed!", e);
}
}
public static HttpClient buildHttpClient(Vertx vertx) {
HttpClientOptions httpClientOptions = new HttpClientOptions();
log.info("[PowerJob-Vertx] use HttpClientOptions: {}", httpClientOptions);
return vertx.createHttpClient(httpClientOptions);
}
}

View File

@ -0,0 +1,99 @@
package tech.powerjob.remote.http.vertx;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.*;
import io.vertx.core.json.JsonObject;
import lombok.SneakyThrows;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.http.HttpProtocol;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
/**
* VertxTransporter
*
* @author tjq
* @since 2023/1/1
*/
public class VertxTransporter implements Transporter {
private final HttpClient httpClient;
private static final Protocol PROTOCOL = new HttpProtocol();
public VertxTransporter(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public Protocol getProtocol() {
return PROTOCOL;
}
@Override
public void tell(URL url, PowerSerializable request) {
post(url, request);
}
@Override
public CompletionStage<Object> ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException {
return post(url, request);
}
private CompletionStage<Object> post(URL url, PowerSerializable request) {
final String host = url.getAddress().getHost();
final int port = url.getAddress().getPort();
final String path = url.getLocation().toPath();
RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.POST)
.setHost(host)
.setPort(port)
.setURI(path);
// 获取远程服务器的HTTP连接
Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
// 转换 -> 发送请求获取响应
Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest -> httpClientRequest.send(JsonObject.mapFrom(request).toBuffer()));
return responseFuture.compose(httpClientResponse -> {
// throw exception
final int statusCode = httpClientResponse.statusCode();
if (statusCode != HttpResponseStatus.OK.code()) {
// CompletableFuture.get() 时会传递抛出该异常
throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
host, port, path, statusCode, httpClientResponse.statusMessage()
));
}
return httpClientResponse.body().compose(x -> {
// TODO: 类型转换
return Future.succeededFuture(x.toJson());
});
}).toCompletionStage();
}
@SneakyThrows
public static void main(String[] args) {
final Vertx vertx = Vertx.vertx();
final HttpClient hc = Vertx.vertx().createHttpClient();
VertxTransporter transport = new VertxTransporter(hc);
ServerScheduleJobReq serverScheduleJobReq = new ServerScheduleJobReq();
serverScheduleJobReq.setJobId(1234L);
serverScheduleJobReq.setJobParams("asdasdas");
URL url = new URL();
url.setAddress(new Address().setHost("127.0.0.1").setPort(7890));
url.setLocation(new HandlerLocation().setRootPath("test").setMethodPath("abc"));
final CompletionStage<Object> post = transport.post(url, serverScheduleJobReq);
System.out.println(post.toCompletableFuture().get());
}
}