feat: remote framework support Proxy

This commit is contained in:
tjq 2023-11-15 23:12:57 +08:00
parent 9b7c237cf0
commit 5435cf136f
9 changed files with 274 additions and 4 deletions

View File

@ -23,8 +23,9 @@ public class BenchmarkActor {
@Handler(path = "standard")
public BenchmarkResponse standardRequest(BenchmarkRequest request) {
String id = request.getId();
long startTs = System.currentTimeMillis();
log.info("[BenchmarkActor] [standardRequest] receive request: {}", request);
log.info("[BenchmarkActor] [standardRequest] [{}] receive request: {}", id, request);
BenchmarkResponse response = new BenchmarkResponse()
.setSuccess(true)
.setContent(request.getContent())
@ -35,6 +36,7 @@ public class BenchmarkActor {
}
executeSleep(request);
response.setServerCost(System.currentTimeMillis() - startTs);
log.info("[BenchmarkActor] [standardRequest] [{}] finished process, start to return.", id);
return response;
}
@ -61,6 +63,7 @@ public class BenchmarkActor {
@Data
@Accessors(chain = true)
public static class BenchmarkRequest implements PowerSerializable {
private String id;
/**
* 请求内容
*/

View File

@ -22,4 +22,6 @@ public class CSInitializerConfig implements Serializable {
private Address bindAddress;
private ServerType serverType;
private ProxyConfig proxyConfig;
}

View File

@ -0,0 +1,37 @@
package tech.powerjob.remote.framework.cs;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 代理配置
*
* @author tjq
* @since 2023/11/15
*/
@Data
@Accessors(chain = true)
public class ProxyConfig implements Serializable {
/**
* 本机是否初始化代理服务器
* 当其他服务需要通过代理服务访问本机时设置为 true会在本机开启端口提供转发服务
*/
private boolean enableProxyServer;
/**
* 本机启动的代理服务器端口 enableProxyServer true 时有效
*/
private Integer proxyServerPort;
/* ******************* 上述配置是本机自身行为,下面的配置是对外的访问行为,请勿混淆 ******************* */
/**
* 是否启用代理去访问其他节点
*/
private boolean useProxy;
/**
* 访问其他节点的代理服务器完整的地址
* 域名 OR IP:port
*/
private String proxyUrl;
}

View File

@ -0,0 +1,31 @@
package tech.powerjob.remote.framework.proxy;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 代理方法
*
* @author tjq
* @since 2023/11/15
*/
@Getter
@AllArgsConstructor
public enum ProxyMethod {
TELL(1),
ASK(2)
;
private final Integer v;
public static ProxyMethod of(Integer vv) {
for (ProxyMethod proxyMethod : values()) {
if (proxyMethod.v.equals(vv)) {
return proxyMethod;
}
}
throw new IllegalArgumentException("can't find ProxyMethod by " + vv);
}
}

View File

@ -0,0 +1,38 @@
package tech.powerjob.remote.framework.proxy;
import lombok.Data;
import lombok.experimental.Accessors;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.URL;
import java.io.Serializable;
/**
* 请求对象
*
* @author tjq
* @since 2023/11/15
*/
@Data
@Accessors(chain = true)
public class ProxyRequest implements Serializable {
public ProxyRequest() {
}
/**
* 真正地访问地址
*/
private URL url;
/**
* 真正地请求数据
*/
private PowerSerializable request;
/**
* {@link ProxyMethod}
*/
private Integer proxyMethod;
}

View File

@ -0,0 +1,26 @@
package tech.powerjob.remote.framework.proxy;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 代理结果
*
* @author tjq
* @since 2023/11/15
*/
@Data
@Accessors(chain = true)
public class ProxyResult implements Serializable {
public ProxyResult() {
}
private boolean success;
private String data;
private String msg;
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.remote.framework.proxy;
import tech.powerjob.remote.framework.cs.ProxyConfig;
import java.util.concurrent.CompletionStage;
/**
* 代理服务
*
* @author tjq
* @since 2023/11/15
*/
public interface ProxyService {
/**
* 初始化
* @param proxyConfig 代理服务
*/
void initialize(ProxyConfig proxyConfig);
/**
* 代理请求
* @param proxyRequest 代理请求
* @return 代理响应
*/
CompletionStage<ProxyResult> sendProxyRequest(ProxyRequest proxyRequest);
}

View File

@ -0,0 +1,96 @@
package tech.powerjob.remote.framework.proxy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.cs.ProxyConfig;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
/**
* 具有代理功能的通讯器
*
* @author tjq
* @since 2023/11/15
*/
@Slf4j
public class ProxyTransporter implements Transporter {
private final ProxyService proxyService;
private final Transporter realTransporter;
private final CSInitializerConfig csInitializerConfig;
public ProxyTransporter(ProxyService proxyService, Transporter realTransporter, CSInitializerConfig csInitializerConfig) {
this.proxyService = proxyService;
this.realTransporter = realTransporter;
this.csInitializerConfig = csInitializerConfig;
}
@Override
public Protocol getProtocol() {
return realTransporter.getProtocol();
}
@Override
public void tell(URL url, PowerSerializable request) {
if (skipProxy(url)) {
realTransporter.tell(url, request);
return;
}
ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.TELL.getV());
proxyService.sendProxyRequest(proxyRequest);
}
@Override
@SuppressWarnings("unchecked")
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
if (skipProxy(url)) {
return realTransporter.ask(url, request, clz);
}
ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.ASK.getV());
CompletionStage<ProxyResult> proxyRequestCompletionStage = proxyService.sendProxyRequest(proxyRequest);
return proxyRequestCompletionStage.thenApply(pr -> {
if (pr.isSuccess()) {
if (clz == null) {
return null;
}
if (clz.equals(String.class)) {
return (T) pr.getData();
}
try {
return JsonUtils.parseObject(pr.getData(), clz);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
throw new RemotingException("proxy failed, msg: " + pr.getMsg());
});
}
private boolean skipProxy(URL url) {
ProxyConfig proxyConfig = csInitializerConfig.getProxyConfig();
if (proxyConfig == null) {
return true;
}
if (!proxyConfig.isUseProxy()) {
return true;
}
if (StringUtils.isEmpty(proxyConfig.getProxyUrl())) {
return true;
}
// 仅对向通讯需要使用代理
return Objects.equals(url.getServerType(), csInitializerConfig.getServerType());
}
}

View File

@ -15,6 +15,7 @@ import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -45,26 +46,35 @@ class HttpVertxCSInitializerTest {
BenchmarkActor.BenchmarkRequest request = new BenchmarkActor.BenchmarkRequest()
.setContent("request from test")
.setBlockingMills(100)
.setResponseSize(10240);
.setResponseSize(1024);
log.info("[HttpVertxCSInitializerTest] test empty request!");
URL emptyURL = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("emptyReturn").setRootPath("benchmark"));
request.setId(UUID.randomUUID().toString());
long s1 = System.currentTimeMillis();
transporter.tell(emptyURL, request);
log.info("[HttpVertxCSInitializerTest] test empty request, tell cost: {}ms", System.currentTimeMillis() - s1);
log.info("[HttpVertxCSInitializerTest] test string request!");
URL stringURL = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("stringReturn").setRootPath("benchmark"));
final String strResponse = transporter.ask(stringURL, request, String.class).toCompletableFuture().get();
log.info("[HttpVertxCSInitializerTest] strResponse: {}", strResponse);
request.setId(UUID.randomUUID().toString());
long s2 = System.currentTimeMillis();
CompletionStage<String> stringCompletionStage = transporter.ask(stringURL, request, String.class);
long s3 = System.currentTimeMillis();
final String strResponse = stringCompletionStage.toCompletableFuture().get();
long s4 = System.currentTimeMillis();
log.info("[HttpVertxCSInitializerTest] askCost:{}, waitCost:{}, strResponse: {}", s3-s2, s4-s3, strResponse);
log.info("[HttpVertxCSInitializerTest] test normal request!");
URL url = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark"));
request.setId(UUID.randomUUID().toString());
final CompletionStage<BenchmarkActor.BenchmarkResponse> benchmarkResponseCompletionStage = transporter.ask(url, request, BenchmarkActor.BenchmarkResponse.class);
final BenchmarkActor.BenchmarkResponse response = benchmarkResponseCompletionStage.toCompletableFuture().get(10, TimeUnit.SECONDS);
log.info("[HttpVertxCSInitializerTest] response: {}", response);