From 7c555ae731324d0a732b78063a1940e3f56e59ac Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 16 Nov 2023 22:41:59 +0800 Subject: [PATCH] feat: HttpProxyService --- others/shade/vertx/pom.xml | 7 + .../remote/benchmark/EngineService.java | 2 +- .../powerjob-remote-framework/pom.xml | 16 +- .../framework/cs/CSInitializerConfig.java | 2 - .../remote/framework/engine/RemoteEngine.java | 2 + .../engine/{ => config}/EngineConfig.java | 6 +- .../{cs => engine/config}/ProxyConfig.java | 4 +- .../engine/impl/PowerJobRemoteEngine.java | 14 +- .../framework/proxy/HttpProxyService.java | 239 ++++++++++++++++++ .../remote/framework/proxy/ProxyRequest.java | 38 --- .../remote/framework/proxy/ProxyService.java | 15 +- .../framework/proxy/ProxyTransporter.java | 96 ------- .../proxy/{ => module}/ProxyMethod.java | 2 +- .../framework/proxy/module/ProxyRequest.java | 67 +++++ .../proxy/{ => module}/ProxyResult.java | 2 +- .../framework/engine/RemoteEngineTest.java | 4 +- .../framework/proxy/HttpProxyServiceTest.java | 58 +++++ .../framework/test/TestTransporter.java | 41 +++ .../http/HttpVertxCSInitializerTest.java | 2 +- .../impl/PowerTransportService.java | 2 +- .../tech/powerjob/worker/PowerJobWorker.java | 2 +- 21 files changed, 458 insertions(+), 163 deletions(-) rename powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/{ => config}/EngineConfig.java (85%) rename powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/{cs => engine/config}/ProxyConfig.java (90%) create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java delete mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java delete mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java rename powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/{ => module}/ProxyMethod.java (90%) create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java rename powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/{ => module}/ProxyResult.java (86%) create mode 100644 powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java create mode 100644 powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestTransporter.java diff --git a/others/shade/vertx/pom.xml b/others/shade/vertx/pom.xml index 565b4a53..ff8472a5 100644 --- a/others/shade/vertx/pom.xml +++ b/others/shade/vertx/pom.xml @@ -50,6 +50,7 @@ UTF-8 4.4.6 + 2.15.0 @@ -64,6 +65,12 @@ vertx-web ${vertx.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + diff --git a/powerjob-remote/powerjob-remote-benchmark/src/main/java/tech/powerjob/remote/benchmark/EngineService.java b/powerjob-remote/powerjob-remote-benchmark/src/main/java/tech/powerjob/remote/benchmark/EngineService.java index 41079878..bb0eecd8 100644 --- a/powerjob-remote/powerjob-remote-benchmark/src/main/java/tech/powerjob/remote/benchmark/EngineService.java +++ b/powerjob-remote/powerjob-remote-benchmark/src/main/java/tech/powerjob/remote/benchmark/EngineService.java @@ -7,7 +7,7 @@ import tech.powerjob.common.enums.Protocol; import tech.powerjob.remote.framework.BenchmarkActor; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.ServerType; -import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.remote.framework.transporter.Transporter; diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml index e63f358a..eebd2010 100644 --- a/powerjob-remote/powerjob-remote-framework/pom.xml +++ b/powerjob-remote/powerjob-remote-framework/pom.xml @@ -18,9 +18,9 @@ UTF-8 4.3.6 + 0.10.2 - - + 4.4.6 @@ -36,6 +36,18 @@ ${reflections.version} + + tech.powerjob + powerjob-shade-vertx + ${powerjob-shade-vertx.version} + + + * + * + + + + \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java index 50d22c3f..a64051fd 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java @@ -22,6 +22,4 @@ public class CSInitializerConfig implements Serializable { private Address bindAddress; private ServerType serverType; - - private ProxyConfig proxyConfig; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java index e86d2722..5320c085 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java @@ -1,5 +1,7 @@ package tech.powerjob.remote.framework.engine; +import tech.powerjob.remote.framework.engine.config.EngineConfig; + import java.io.IOException; /** diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/config/EngineConfig.java similarity index 85% rename from powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java rename to powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/config/EngineConfig.java index a9b82961..1d8cb40b 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/config/EngineConfig.java @@ -1,4 +1,4 @@ -package tech.powerjob.remote.framework.engine; +package tech.powerjob.remote.framework.engine.config; import lombok.Data; import lombok.experimental.Accessors; @@ -30,6 +30,10 @@ public class EngineConfig implements Serializable { * 绑定的本地地址 */ private Address bindAddress; + /** + * 代理配置 + */ + private ProxyConfig proxyConfig; /** * actor实例,交由使用侧自己实例化以便自行注入各种 bean */ diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/config/ProxyConfig.java similarity index 90% rename from powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java rename to powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/config/ProxyConfig.java index 6a1822a1..9eb5f72d 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/config/ProxyConfig.java @@ -1,4 +1,4 @@ -package tech.powerjob.remote.framework.cs; +package tech.powerjob.remote.framework.engine.config; import lombok.Data; import lombok.experimental.Accessors; @@ -22,7 +22,7 @@ public class ProxyConfig implements Serializable { /** * 本机启动的代理服务器端口,当 enableProxyServer 为 true 时有效 */ - private Integer proxyServerPort; + private Integer proxyServerPort = 9999; /* ******************* 上述配置是本机自身行为,下面的配置是对外的访问行为,请勿混淆 ******************* */ /** diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index 2f9cbc6b..36714f9a 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -5,9 +5,12 @@ import lombok.extern.slf4j.Slf4j; import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.cs.CSInitializer; import tech.powerjob.remote.framework.cs.CSInitializerConfig; -import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.config.ProxyConfig; +import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; +import tech.powerjob.remote.framework.proxy.HttpProxyService; +import tech.powerjob.remote.framework.proxy.ProxyService; import tech.powerjob.remote.framework.transporter.Transporter; import java.io.IOException; @@ -46,16 +49,21 @@ public class PowerJobRemoteEngine implements RemoteEngine { // 构建通讯器 Transporter transporter = csInitializer.buildTransporter(); - engineOutput.setTransporter(transporter); log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType); actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod()))); // 绑定 handler csInitializer.bindHandlers(actorInfos); - log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw); + // 处理代理服务器 + ProxyConfig proxyConfig = engineConfig.getProxyConfig(); + ProxyService proxyService = new HttpProxyService(transporter); + proxyService.initializeProxyServer(proxyConfig); + transporter = proxyService.warpProxyTransporter(engineConfig.getServerType()); + + engineOutput.setTransporter(transporter); return engineOutput; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java new file mode 100644 index 00000000..9d88f7da --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java @@ -0,0 +1,239 @@ +package tech.powerjob.remote.framework.proxy; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.engine.config.ProxyConfig; +import tech.powerjob.remote.framework.proxy.module.ProxyMethod; +import tech.powerjob.remote.framework.proxy.module.ProxyRequest; +import tech.powerjob.remote.framework.proxy.module.ProxyResult; +import tech.powerjob.remote.framework.transporter.Protocol; +import tech.powerjob.remote.framework.transporter.Transporter; +import tech.powerjob.shade.io.netty.handler.codec.http.HttpHeaderNames; +import tech.powerjob.shade.io.netty.handler.codec.http.HttpHeaderValues; +import tech.powerjob.shade.io.netty.handler.codec.http.HttpResponseStatus; +import tech.powerjob.shade.io.vertx.core.Future; +import tech.powerjob.shade.io.vertx.core.Vertx; +import tech.powerjob.shade.io.vertx.core.VertxOptions; +import tech.powerjob.shade.io.vertx.core.http.*; +import tech.powerjob.shade.io.vertx.core.json.JsonObject; +import tech.powerjob.shade.io.vertx.ext.web.RequestBody; +import tech.powerjob.shade.io.vertx.ext.web.Router; +import tech.powerjob.shade.io.vertx.ext.web.handler.BodyHandler; + +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +/** + * HTTP 代理服务 + * + * @author tjq + * @since 2023/11/16 + */ +@Slf4j +public class HttpProxyService implements ProxyService { + + private Vertx _vertx; + private ProxyConfig proxyConfig; + + private final Transporter transporter; + + private static final String PROXY_PATH = "/proxy"; + + public HttpProxyService(Transporter transporter) { + this.transporter = transporter; + } + + @Override + @SneakyThrows + public void initializeProxyServer(ProxyConfig proxyConfig) { + + this.proxyConfig = proxyConfig; + if (proxyConfig == null || !proxyConfig.isEnableProxyServer()) { + log.info("[HttpProxyService] no proxy server config, skip initialize."); + return; + } + + log.info("[HttpProxyService] start to initialize proxy server by proxy config: {}", proxyConfig); + + HttpServerOptions httpServerOptions = new HttpServerOptions().setIdleTimeout(300); + HttpServer httpServer = vertx().createHttpServer(httpServerOptions); + Router router = Router.router(vertx()); + router.route().handler(BodyHandler.create()); + + router.post(PROXY_PATH).blockingHandler(ctx -> { + final RequestBody body = ctx.body(); + ProxyRequest proxyRequest = body.asPojo(ProxyRequest.class); + + PowerSerializable ret = (PowerSerializable) JsonUtils.parseObjectUnsafe(proxyRequest.getRequest(), proxyRequest.getClz()); + + if (ProxyMethod.TELL.getV().equals(proxyRequest.getProxyMethod())) { + transporter.tell(proxyRequest.getUrl(), ret); + ctx.json(JsonObject.mapFrom(new ProxyResult().setSuccess(true))); + return; + } + + ProxyResult proxyResult = new ProxyResult(); + try { + CompletionStage proxyRequestStage = transporter.ask(proxyRequest.getUrl(), ret, Object.class); + Object originResult = proxyRequestStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + proxyResult.setSuccess(true).setData(JsonUtils.toJSONString(originResult)); + } catch (Exception e) { + proxyResult.setSuccess(false).setMsg(ExceptionUtils.getMessage(e)); + log.error("[HttpProxyService] proxy request failed!", e); + } + + log.debug("[HttpProxyService] send proxy result: {}", proxyResult); + ctx.json(JsonObject.mapFrom(proxyResult)); + }); + + Integer proxyServerPort = proxyConfig.getProxyServerPort(); + + httpServer.requestHandler(router) + .exceptionHandler(e -> log.error("[HttpProxyService] unknown exception in Actor communication!", e)) + .listen(proxyServerPort) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES); + + log.info("[HttpProxyService] initialize proxy server in port: {}", proxyServerPort); + } + + @Override + public Transporter warpProxyTransporter(ServerType currentServerType) { + + if (proxyConfig.isUseProxy()) { + String proxyUrl = proxyConfig.getProxyUrl(); + if (StringUtils.isEmpty(proxyUrl)) { + throw new IllegalArgumentException("when you use proxy, you must set the proxy url(ProxyConfig.proxyUrl)!"); + } + log.info("[HttpProxyService] use proxy to visit other type node, proxy url: {}", proxyUrl); + return new ProxyTransporter(currentServerType); + } + return transporter; + } + + CompletionStage sendProxyRequest(ProxyRequest proxyRequest) { + + String fullUrl = String.format("%s/%s", proxyConfig.getProxyUrl(), PROXY_PATH); + + HttpClient httpClient = vertx().createHttpClient(); + RequestOptions requestOptions = new RequestOptions() + .setMethod(HttpMethod.POST) + .setAbsoluteURI(fullUrl); + // 转换 -> 发送请求获取响应 + Future responseFuture = httpClient + .request(requestOptions) + .compose(httpClientRequest -> + httpClientRequest + .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) + .send(JsonObject.mapFrom(proxyRequest).toBuffer()) + ); + return responseFuture.compose(httpClientResponse -> { + // throw exception + final int statusCode = httpClientResponse.statusCode(); + if (statusCode != HttpResponseStatus.OK.code()) { + // CompletableFuture.get() 时会传递抛出该异常 + throw new RemotingException(String.format("request [%s] failed, status: %d, msg: %s", + fullUrl, statusCode, httpClientResponse.statusMessage() + )); + } + + return httpClientResponse.body().compose(x -> Future.succeededFuture(x.toJsonObject().mapTo(ProxyResult.class))); + }) + .onFailure(t -> log.warn("[HttpProxyService] sendProxyRequest to url[{}] failed,msg: {}", fullUrl, ExceptionUtils.getMessage(t))) + .toCompletionStage(); + + } + + private Vertx vertx() { + + if (_vertx != null) { + return _vertx; + } + + synchronized (this) { + if (_vertx == null) { + VertxOptions options = new VertxOptions().setWorkerPoolSize(32).setInternalBlockingPoolSize(32); + _vertx = Vertx.vertx(options); + } + } + return _vertx; + } + + + class ProxyTransporter implements Transporter { + + private final ServerType myServerType; + + public ProxyTransporter(ServerType myServerType) { + this.myServerType = myServerType; + } + + @Override + public Protocol getProtocol() { + return transporter.getProtocol(); + } + + @Override + public void tell(URL url, PowerSerializable request) { + if (skipProxy(url)) { + transporter.tell(url, request); + return; + } + + ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.TELL.getV()); + sendProxyRequest(proxyRequest); + } + + @Override + @SuppressWarnings("unchecked") + public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { + if (skipProxy(url)) { + return transporter.ask(url, request, clz); + } + ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.ASK.getV()); + CompletionStage proxyRequestCompletionStage = sendProxyRequest(proxyRequest); + return proxyRequestCompletionStage.thenApply(pr -> { + if (pr.isSuccess()) { + if (clz == null) { + return null; + } + if (clz.equals(String.class)) { + return (T) pr.getData(); + } + try { + return JsonUtils.parseObject(pr.getData(), clz); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + } + throw new RemotingException("proxy failed, msg: " + pr.getMsg()); + }); + } + + private boolean skipProxy(URL url) { + + if (proxyConfig == null) { + return true; + } + if (!proxyConfig.isUseProxy()) { + return true; + } + if (StringUtils.isEmpty(proxyConfig.getProxyUrl())) { + return true; + } + + // 仅对向通讯需要使用代理 + return Objects.equals(url.getServerType(), myServerType); + } + } + +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java deleted file mode 100644 index 3a2ae810..00000000 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java +++ /dev/null @@ -1,38 +0,0 @@ -package tech.powerjob.remote.framework.proxy; - -import lombok.Data; -import lombok.experimental.Accessors; -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.remote.framework.base.URL; - -import java.io.Serializable; - -/** - * 请求对象 - * - * @author tjq - * @since 2023/11/15 - */ -@Data -@Accessors(chain = true) -public class ProxyRequest implements Serializable { - - public ProxyRequest() { - } - - /** - * 真正地访问地址 - */ - private URL url; - - /** - * 真正地请求数据 - */ - private PowerSerializable request; - - /** - * {@link ProxyMethod} - */ - private Integer proxyMethod; - -} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java index 1e181a20..e3579e48 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java @@ -1,8 +1,8 @@ package tech.powerjob.remote.framework.proxy; -import tech.powerjob.remote.framework.cs.ProxyConfig; - -import java.util.concurrent.CompletionStage; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.engine.config.ProxyConfig; +import tech.powerjob.remote.framework.transporter.Transporter; /** * 代理服务 @@ -16,12 +16,7 @@ public interface ProxyService { * 初始化 * @param proxyConfig 代理服务 */ - void initialize(ProxyConfig proxyConfig); + void initializeProxyServer(ProxyConfig proxyConfig); - /** - * 代理请求 - * @param proxyRequest 代理请求 - * @return 代理响应 - */ - CompletionStage sendProxyRequest(ProxyRequest proxyRequest); + Transporter warpProxyTransporter(ServerType currentServerType); } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java deleted file mode 100644 index 503f7103..00000000 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java +++ /dev/null @@ -1,96 +0,0 @@ -package tech.powerjob.remote.framework.proxy; - -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.remote.framework.base.RemotingException; -import tech.powerjob.remote.framework.base.URL; -import tech.powerjob.remote.framework.cs.CSInitializerConfig; -import tech.powerjob.remote.framework.cs.ProxyConfig; -import tech.powerjob.remote.framework.transporter.Protocol; -import tech.powerjob.remote.framework.transporter.Transporter; - -import java.util.Objects; -import java.util.concurrent.CompletionStage; - -/** - * 具有代理功能的通讯器 - * - * @author tjq - * @since 2023/11/15 - */ -@Slf4j -public class ProxyTransporter implements Transporter { - - private final ProxyService proxyService; - private final Transporter realTransporter; - private final CSInitializerConfig csInitializerConfig; - - public ProxyTransporter(ProxyService proxyService, Transporter realTransporter, CSInitializerConfig csInitializerConfig) { - this.proxyService = proxyService; - this.realTransporter = realTransporter; - this.csInitializerConfig = csInitializerConfig; - } - - @Override - public Protocol getProtocol() { - return realTransporter.getProtocol(); - } - - @Override - public void tell(URL url, PowerSerializable request) { - if (skipProxy(url)) { - realTransporter.tell(url, request); - return; - } - - ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.TELL.getV()); - proxyService.sendProxyRequest(proxyRequest); - } - - @Override - @SuppressWarnings("unchecked") - public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { - if (skipProxy(url)) { - return realTransporter.ask(url, request, clz); - } - ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.ASK.getV()); - CompletionStage proxyRequestCompletionStage = proxyService.sendProxyRequest(proxyRequest); - return proxyRequestCompletionStage.thenApply(pr -> { - if (pr.isSuccess()) { - if (clz == null) { - return null; - } - if (clz.equals(String.class)) { - return (T) pr.getData(); - } - try { - return JsonUtils.parseObject(pr.getData(), clz); - } catch (Exception e) { - ExceptionUtils.rethrow(e); - } - } - throw new RemotingException("proxy failed, msg: " + pr.getMsg()); - }); - } - - private boolean skipProxy(URL url) { - - ProxyConfig proxyConfig = csInitializerConfig.getProxyConfig(); - if (proxyConfig == null) { - return true; - } - if (!proxyConfig.isUseProxy()) { - return true; - } - if (StringUtils.isEmpty(proxyConfig.getProxyUrl())) { - return true; - } - - // 仅对向通讯需要使用代理 - return Objects.equals(url.getServerType(), csInitializerConfig.getServerType()); - } - -} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyMethod.java similarity index 90% rename from powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java rename to powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyMethod.java index 2951f6f7..366255ea 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyMethod.java @@ -1,4 +1,4 @@ -package tech.powerjob.remote.framework.proxy; +package tech.powerjob.remote.framework.proxy.module; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java new file mode 100644 index 00000000..6d8fadad --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java @@ -0,0 +1,67 @@ +package tech.powerjob.remote.framework.proxy.module; + +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.remote.framework.base.URL; + +import java.io.Serializable; + +/** + * 请求对象 + * + * @author tjq + * @since 2023/11/15 + */ +public class ProxyRequest implements Serializable { + + public ProxyRequest() { + } + + /** + * 真正地访问地址 + */ + private URL url; + + /** + * 真正地请求数据 + */ + private String request; + + private Class clz; + + /** + * {@link ProxyMethod} + */ + private Integer proxyMethod; + + public URL getUrl() { + return url; + } + + public ProxyRequest setUrl(URL url) { + this.url = url; + return this; + } + + public String getRequest() { + return request; + } + + public ProxyRequest setRequest(Object request) { + this.request = JsonUtils.toJSONString(request); + this.clz = request.getClass(); + return this; + } + + public Class getClz() { + return clz; + } + + public Integer getProxyMethod() { + return proxyMethod; + } + + public ProxyRequest setProxyMethod(Integer proxyMethod) { + this.proxyMethod = proxyMethod; + return this; + } +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyResult.java similarity index 86% rename from powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java rename to powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyResult.java index ba0ce92d..b6ca3f9b 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyResult.java @@ -1,4 +1,4 @@ -package tech.powerjob.remote.framework.proxy; +package tech.powerjob.remote.framework.proxy.module; import lombok.Data; import lombok.experimental.Accessors; diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java index eaf8c79d..233620ef 100644 --- a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java @@ -1,12 +1,10 @@ package tech.powerjob.remote.framework.engine; -import com.google.common.collect.Sets; import org.junit.jupiter.api.Test; import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; -import static org.junit.jupiter.api.Assertions.*; - /** * RemoteEngineTest * diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java new file mode 100644 index 00000000..59fe08eb --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java @@ -0,0 +1,58 @@ +package tech.powerjob.remote.framework.proxy; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.engine.config.ProxyConfig; +import tech.powerjob.remote.framework.proxy.module.ProxyMethod; +import tech.powerjob.remote.framework.proxy.module.ProxyRequest; +import tech.powerjob.remote.framework.proxy.module.ProxyResult; +import tech.powerjob.remote.framework.test.TestTransporter; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +/** + * HttpProxyServiceTest + * + * @author tjq + * @since 2023/11/16 + */ +@Slf4j +class HttpProxyServiceTest { + + @Test + @SneakyThrows + void testHttpProxyService() { + + ProxyConfig proxyConfig = new ProxyConfig(); + proxyConfig.setEnableProxyServer(true); + + proxyConfig.setProxyUrl("http://127.0.0.1:9999"); + proxyConfig.setUseProxy(true); + + + HttpProxyService httpProxyService = new HttpProxyService(new TestTransporter()); + + httpProxyService.initializeProxyServer(proxyConfig); + + URL url = new URL(); + AskResponse askResponse = new AskResponse(); + askResponse.setMessage("from test"); + + ProxyRequest tellProxyRequest = new ProxyRequest().setUrl(url).setRequest(askResponse).setProxyMethod(ProxyMethod.TELL.getV()); + CompletionStage tellProxyResultCompletionStage = httpProxyService.sendProxyRequest(tellProxyRequest); + ProxyResult tellProxyResult = tellProxyResultCompletionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + log.info("[HttpProxyServiceTest] tellProxyResult: {}", tellProxyResult); + + ProxyRequest askProxyRequest = new ProxyRequest().setUrl(url).setRequest(askResponse).setProxyMethod(ProxyMethod.ASK.getV()); + CompletionStage askProxyResultCompletionStage = httpProxyService.sendProxyRequest(askProxyRequest); + ProxyResult askProxyResult = askProxyResultCompletionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + log.info("[HttpProxyServiceTest] askProxyResult: {}", askProxyResult); + + Thread.sleep(1000); + } + +} \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestTransporter.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestTransporter.java new file mode 100644 index 00000000..449456d5 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestTransporter.java @@ -0,0 +1,41 @@ +package tech.powerjob.remote.framework.test; + +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.transporter.Protocol; +import tech.powerjob.remote.framework.transporter.Transporter; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * TestTransporter + * + * @author tjq + * @since 2023/11/16 + */ +@Slf4j +public class TestTransporter implements Transporter { + @Override + public Protocol getProtocol() { + return null; + } + + @Override + public void tell(URL url, PowerSerializable request) { + log.info("[TestTransporter] invoke tell, url: {}, request: {}", url, request); + } + + @Override + @SuppressWarnings("unchecked") + public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { + log.info("[TestTransporter] invoke ask, url: {}, request: {}", url, request); + AskResponse askResponse = new AskResponse(); + askResponse.setSuccess(true); + askResponse.setMessage("FromTestTransporter, force success"); + return (CompletionStage) CompletableFuture.completedFuture(askResponse); + } +} diff --git a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java index 4df8ebc6..08a7432f 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java @@ -9,7 +9,7 @@ import tech.powerjob.remote.framework.BenchmarkActor; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.HandlerLocation; import tech.powerjob.remote.framework.base.URL; -import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java index 331a83ad..7f106558 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java @@ -22,7 +22,7 @@ import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.base.URL; -import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index eceac34c..6712f0b6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -10,7 +10,7 @@ import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.PropertyUtils; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.ServerType; -import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;