From 5435cf136f50118c570a67f26108395ecafbb5f3 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 15 Nov 2023 23:12:57 +0800 Subject: [PATCH] feat: remote framework support Proxy --- .../remote/framework/BenchmarkActor.java | 5 +- .../framework/cs/CSInitializerConfig.java | 2 + .../remote/framework/cs/ProxyConfig.java | 37 +++++++ .../remote/framework/proxy/ProxyMethod.java | 31 ++++++ .../remote/framework/proxy/ProxyRequest.java | 38 ++++++++ .../remote/framework/proxy/ProxyResult.java | 26 +++++ .../remote/framework/proxy/ProxyService.java | 27 ++++++ .../framework/proxy/ProxyTransporter.java | 96 +++++++++++++++++++ .../http/HttpVertxCSInitializerTest.java | 16 +++- 9 files changed, 274 insertions(+), 4 deletions(-) create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java index 325aa6e2..36e3591e 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java @@ -23,8 +23,9 @@ public class BenchmarkActor { @Handler(path = "standard") public BenchmarkResponse standardRequest(BenchmarkRequest request) { + String id = request.getId(); long startTs = System.currentTimeMillis(); - log.info("[BenchmarkActor] [standardRequest] receive request: {}", request); + log.info("[BenchmarkActor] [standardRequest] [{}] receive request: {}", id, request); BenchmarkResponse response = new BenchmarkResponse() .setSuccess(true) .setContent(request.getContent()) @@ -35,6 +36,7 @@ public class BenchmarkActor { } executeSleep(request); response.setServerCost(System.currentTimeMillis() - startTs); + log.info("[BenchmarkActor] [standardRequest] [{}] finished process, start to return.", id); return response; } @@ -61,6 +63,7 @@ public class BenchmarkActor { @Data @Accessors(chain = true) public static class BenchmarkRequest implements PowerSerializable { + private String id; /** * 请求内容 */ diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java index a64051fd..50d22c3f 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java @@ -22,4 +22,6 @@ public class CSInitializerConfig implements Serializable { private Address bindAddress; private ServerType serverType; + + private ProxyConfig proxyConfig; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java new file mode 100644 index 00000000..6a1822a1 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/ProxyConfig.java @@ -0,0 +1,37 @@ +package tech.powerjob.remote.framework.cs; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.io.Serializable; + +/** + * 代理配置 + * + * @author tjq + * @since 2023/11/15 + */ +@Data +@Accessors(chain = true) +public class ProxyConfig implements Serializable { + /** + * 本机是否初始化代理服务器 + * 当其他服务需要通过代理服务访问本机时,设置为 true,会在本机开启端口提供转发服务 + */ + private boolean enableProxyServer; + /** + * 本机启动的代理服务器端口,当 enableProxyServer 为 true 时有效 + */ + private Integer proxyServerPort; + + /* ******************* 上述配置是本机自身行为,下面的配置是对外的访问行为,请勿混淆 ******************* */ + /** + * 是否启用代理(去访问其他节点) + */ + private boolean useProxy; + /** + * (访问其他节点的)代理服务器完整的地址 + * 域名 OR IP:port + */ + private String proxyUrl; +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java new file mode 100644 index 00000000..2951f6f7 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyMethod.java @@ -0,0 +1,31 @@ +package tech.powerjob.remote.framework.proxy; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 代理方法 + * + * @author tjq + * @since 2023/11/15 + */ +@Getter +@AllArgsConstructor +public enum ProxyMethod { + + TELL(1), + + ASK(2) + ; + + private final Integer v; + + public static ProxyMethod of(Integer vv) { + for (ProxyMethod proxyMethod : values()) { + if (proxyMethod.v.equals(vv)) { + return proxyMethod; + } + } + throw new IllegalArgumentException("can't find ProxyMethod by " + vv); + } +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java new file mode 100644 index 00000000..3a2ae810 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyRequest.java @@ -0,0 +1,38 @@ +package tech.powerjob.remote.framework.proxy; + +import lombok.Data; +import lombok.experimental.Accessors; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.remote.framework.base.URL; + +import java.io.Serializable; + +/** + * 请求对象 + * + * @author tjq + * @since 2023/11/15 + */ +@Data +@Accessors(chain = true) +public class ProxyRequest implements Serializable { + + public ProxyRequest() { + } + + /** + * 真正地访问地址 + */ + private URL url; + + /** + * 真正地请求数据 + */ + private PowerSerializable request; + + /** + * {@link ProxyMethod} + */ + private Integer proxyMethod; + +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java new file mode 100644 index 00000000..ba0ce92d --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyResult.java @@ -0,0 +1,26 @@ +package tech.powerjob.remote.framework.proxy; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.io.Serializable; + +/** + * 代理结果 + * + * @author tjq + * @since 2023/11/15 + */ +@Data +@Accessors(chain = true) +public class ProxyResult implements Serializable { + + public ProxyResult() { + } + + private boolean success; + + private String data; + + private String msg; +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java new file mode 100644 index 00000000..1e181a20 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyService.java @@ -0,0 +1,27 @@ +package tech.powerjob.remote.framework.proxy; + +import tech.powerjob.remote.framework.cs.ProxyConfig; + +import java.util.concurrent.CompletionStage; + +/** + * 代理服务 + * + * @author tjq + * @since 2023/11/15 + */ +public interface ProxyService { + + /** + * 初始化 + * @param proxyConfig 代理服务 + */ + void initialize(ProxyConfig proxyConfig); + + /** + * 代理请求 + * @param proxyRequest 代理请求 + * @return 代理响应 + */ + CompletionStage sendProxyRequest(ProxyRequest proxyRequest); +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java new file mode 100644 index 00000000..503f7103 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/ProxyTransporter.java @@ -0,0 +1,96 @@ +package tech.powerjob.remote.framework.proxy; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.cs.CSInitializerConfig; +import tech.powerjob.remote.framework.cs.ProxyConfig; +import tech.powerjob.remote.framework.transporter.Protocol; +import tech.powerjob.remote.framework.transporter.Transporter; + +import java.util.Objects; +import java.util.concurrent.CompletionStage; + +/** + * 具有代理功能的通讯器 + * + * @author tjq + * @since 2023/11/15 + */ +@Slf4j +public class ProxyTransporter implements Transporter { + + private final ProxyService proxyService; + private final Transporter realTransporter; + private final CSInitializerConfig csInitializerConfig; + + public ProxyTransporter(ProxyService proxyService, Transporter realTransporter, CSInitializerConfig csInitializerConfig) { + this.proxyService = proxyService; + this.realTransporter = realTransporter; + this.csInitializerConfig = csInitializerConfig; + } + + @Override + public Protocol getProtocol() { + return realTransporter.getProtocol(); + } + + @Override + public void tell(URL url, PowerSerializable request) { + if (skipProxy(url)) { + realTransporter.tell(url, request); + return; + } + + ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.TELL.getV()); + proxyService.sendProxyRequest(proxyRequest); + } + + @Override + @SuppressWarnings("unchecked") + public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { + if (skipProxy(url)) { + return realTransporter.ask(url, request, clz); + } + ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.ASK.getV()); + CompletionStage proxyRequestCompletionStage = proxyService.sendProxyRequest(proxyRequest); + return proxyRequestCompletionStage.thenApply(pr -> { + if (pr.isSuccess()) { + if (clz == null) { + return null; + } + if (clz.equals(String.class)) { + return (T) pr.getData(); + } + try { + return JsonUtils.parseObject(pr.getData(), clz); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + } + throw new RemotingException("proxy failed, msg: " + pr.getMsg()); + }); + } + + private boolean skipProxy(URL url) { + + ProxyConfig proxyConfig = csInitializerConfig.getProxyConfig(); + if (proxyConfig == null) { + return true; + } + if (!proxyConfig.isUseProxy()) { + return true; + } + if (StringUtils.isEmpty(proxyConfig.getProxyUrl())) { + return true; + } + + // 仅对向通讯需要使用代理 + return Objects.equals(url.getServerType(), csInitializerConfig.getServerType()); + } + +} diff --git a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java index a009293b..4df8ebc6 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java @@ -15,6 +15,7 @@ import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.remote.framework.transporter.Transporter; +import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -45,26 +46,35 @@ class HttpVertxCSInitializerTest { BenchmarkActor.BenchmarkRequest request = new BenchmarkActor.BenchmarkRequest() .setContent("request from test") .setBlockingMills(100) - .setResponseSize(10240); + .setResponseSize(1024); log.info("[HttpVertxCSInitializerTest] test empty request!"); URL emptyURL = new URL() .setAddress(address) .setLocation(new HandlerLocation().setMethodPath("emptyReturn").setRootPath("benchmark")); + request.setId(UUID.randomUUID().toString()); + long s1 = System.currentTimeMillis(); transporter.tell(emptyURL, request); + log.info("[HttpVertxCSInitializerTest] test empty request, tell cost: {}ms", System.currentTimeMillis() - s1); log.info("[HttpVertxCSInitializerTest] test string request!"); URL stringURL = new URL() .setAddress(address) .setLocation(new HandlerLocation().setMethodPath("stringReturn").setRootPath("benchmark")); - final String strResponse = transporter.ask(stringURL, request, String.class).toCompletableFuture().get(); - log.info("[HttpVertxCSInitializerTest] strResponse: {}", strResponse); + request.setId(UUID.randomUUID().toString()); + long s2 = System.currentTimeMillis(); + CompletionStage stringCompletionStage = transporter.ask(stringURL, request, String.class); + long s3 = System.currentTimeMillis(); + final String strResponse = stringCompletionStage.toCompletableFuture().get(); + long s4 = System.currentTimeMillis(); + log.info("[HttpVertxCSInitializerTest] askCost:{}, waitCost:{}, strResponse: {}", s3-s2, s4-s3, strResponse); log.info("[HttpVertxCSInitializerTest] test normal request!"); URL url = new URL() .setAddress(address) .setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark")); + request.setId(UUID.randomUUID().toString()); final CompletionStage benchmarkResponseCompletionStage = transporter.ask(url, request, BenchmarkActor.BenchmarkResponse.class); final BenchmarkActor.BenchmarkResponse response = benchmarkResponseCompletionStage.toCompletableFuture().get(10, TimeUnit.SECONDS); log.info("[HttpVertxCSInitializerTest] response: {}", response);