mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: HttpProxyService
This commit is contained in:
parent
b741302da5
commit
7c555ae731
@ -50,6 +50,7 @@
|
|||||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||||
|
|
||||||
<vertx.version>4.4.6</vertx.version>
|
<vertx.version>4.4.6</vertx.version>
|
||||||
|
<jackson.version>2.15.0</jackson.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
@ -64,6 +65,12 @@
|
|||||||
<artifactId>vertx-web</artifactId>
|
<artifactId>vertx-web</artifactId>
|
||||||
<version>${vertx.version}</version>
|
<version>${vertx.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<profiles>
|
<profiles>
|
||||||
|
@ -7,7 +7,7 @@ import tech.powerjob.common.enums.Protocol;
|
|||||||
import tech.powerjob.remote.framework.BenchmarkActor;
|
import tech.powerjob.remote.framework.BenchmarkActor;
|
||||||
import tech.powerjob.remote.framework.base.Address;
|
import tech.powerjob.remote.framework.base.Address;
|
||||||
import tech.powerjob.remote.framework.base.ServerType;
|
import tech.powerjob.remote.framework.base.ServerType;
|
||||||
import tech.powerjob.remote.framework.engine.EngineConfig;
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
||||||
import tech.powerjob.remote.framework.transporter.Transporter;
|
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||||
|
|
||||||
|
@ -18,9 +18,9 @@
|
|||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
|
||||||
<powerjob-common.version>4.3.6</powerjob-common.version>
|
<powerjob-common.version>4.3.6</powerjob-common.version>
|
||||||
|
|
||||||
<reflections.version>0.10.2</reflections.version>
|
<reflections.version>0.10.2</reflections.version>
|
||||||
|
<powerjob-shade-vertx.version>4.4.6</powerjob-shade-vertx.version>
|
||||||
|
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
@ -36,6 +36,18 @@
|
|||||||
<version>${reflections.version}</version>
|
<version>${reflections.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>tech.powerjob</groupId>
|
||||||
|
<artifactId>powerjob-shade-vertx</artifactId>
|
||||||
|
<version>${powerjob-shade-vertx.version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>*</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
@ -22,6 +22,4 @@ public class CSInitializerConfig implements Serializable {
|
|||||||
private Address bindAddress;
|
private Address bindAddress;
|
||||||
|
|
||||||
private ServerType serverType;
|
private ServerType serverType;
|
||||||
|
|
||||||
private ProxyConfig proxyConfig;
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
package tech.powerjob.remote.framework.engine;
|
package tech.powerjob.remote.framework.engine;
|
||||||
|
|
||||||
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package tech.powerjob.remote.framework.engine;
|
package tech.powerjob.remote.framework.engine.config;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
@ -30,6 +30,10 @@ public class EngineConfig implements Serializable {
|
|||||||
* 绑定的本地地址
|
* 绑定的本地地址
|
||||||
*/
|
*/
|
||||||
private Address bindAddress;
|
private Address bindAddress;
|
||||||
|
/**
|
||||||
|
* 代理配置
|
||||||
|
*/
|
||||||
|
private ProxyConfig proxyConfig;
|
||||||
/**
|
/**
|
||||||
* actor实例,交由使用侧自己实例化以便自行注入各种 bean
|
* actor实例,交由使用侧自己实例化以便自行注入各种 bean
|
||||||
*/
|
*/
|
@ -1,4 +1,4 @@
|
|||||||
package tech.powerjob.remote.framework.cs;
|
package tech.powerjob.remote.framework.engine.config;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
@ -22,7 +22,7 @@ public class ProxyConfig implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* 本机启动的代理服务器端口,当 enableProxyServer 为 true 时有效
|
* 本机启动的代理服务器端口,当 enableProxyServer 为 true 时有效
|
||||||
*/
|
*/
|
||||||
private Integer proxyServerPort;
|
private Integer proxyServerPort = 9999;
|
||||||
|
|
||||||
/* ******************* 上述配置是本机自身行为,下面的配置是对外的访问行为,请勿混淆 ******************* */
|
/* ******************* 上述配置是本机自身行为,下面的配置是对外的访问行为,请勿混淆 ******************* */
|
||||||
/**
|
/**
|
@ -5,9 +5,12 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import tech.powerjob.remote.framework.actor.ActorInfo;
|
import tech.powerjob.remote.framework.actor.ActorInfo;
|
||||||
import tech.powerjob.remote.framework.cs.CSInitializer;
|
import tech.powerjob.remote.framework.cs.CSInitializer;
|
||||||
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
|
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
|
||||||
import tech.powerjob.remote.framework.engine.EngineConfig;
|
import tech.powerjob.remote.framework.engine.config.ProxyConfig;
|
||||||
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
import tech.powerjob.remote.framework.engine.EngineOutput;
|
import tech.powerjob.remote.framework.engine.EngineOutput;
|
||||||
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
||||||
|
import tech.powerjob.remote.framework.proxy.HttpProxyService;
|
||||||
|
import tech.powerjob.remote.framework.proxy.ProxyService;
|
||||||
import tech.powerjob.remote.framework.transporter.Transporter;
|
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -46,16 +49,21 @@ public class PowerJobRemoteEngine implements RemoteEngine {
|
|||||||
|
|
||||||
// 构建通讯器
|
// 构建通讯器
|
||||||
Transporter transporter = csInitializer.buildTransporter();
|
Transporter transporter = csInitializer.buildTransporter();
|
||||||
engineOutput.setTransporter(transporter);
|
|
||||||
|
|
||||||
log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
|
log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
|
||||||
actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));
|
actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));
|
||||||
|
|
||||||
// 绑定 handler
|
// 绑定 handler
|
||||||
csInitializer.bindHandlers(actorInfos);
|
csInitializer.bindHandlers(actorInfos);
|
||||||
|
|
||||||
log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);
|
log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);
|
||||||
|
|
||||||
|
// 处理代理服务器
|
||||||
|
ProxyConfig proxyConfig = engineConfig.getProxyConfig();
|
||||||
|
ProxyService proxyService = new HttpProxyService(transporter);
|
||||||
|
proxyService.initializeProxyServer(proxyConfig);
|
||||||
|
transporter = proxyService.warpProxyTransporter(engineConfig.getServerType());
|
||||||
|
|
||||||
|
engineOutput.setTransporter(transporter);
|
||||||
return engineOutput;
|
return engineOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,239 @@
|
|||||||
|
package tech.powerjob.remote.framework.proxy;
|
||||||
|
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
|
import tech.powerjob.common.PowerSerializable;
|
||||||
|
import tech.powerjob.common.serialize.JsonUtils;
|
||||||
|
import tech.powerjob.remote.framework.base.RemotingException;
|
||||||
|
import tech.powerjob.remote.framework.base.ServerType;
|
||||||
|
import tech.powerjob.remote.framework.base.URL;
|
||||||
|
import tech.powerjob.remote.framework.engine.config.ProxyConfig;
|
||||||
|
import tech.powerjob.remote.framework.proxy.module.ProxyMethod;
|
||||||
|
import tech.powerjob.remote.framework.proxy.module.ProxyRequest;
|
||||||
|
import tech.powerjob.remote.framework.proxy.module.ProxyResult;
|
||||||
|
import tech.powerjob.remote.framework.transporter.Protocol;
|
||||||
|
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||||
|
import tech.powerjob.shade.io.netty.handler.codec.http.HttpHeaderNames;
|
||||||
|
import tech.powerjob.shade.io.netty.handler.codec.http.HttpHeaderValues;
|
||||||
|
import tech.powerjob.shade.io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
import tech.powerjob.shade.io.vertx.core.Future;
|
||||||
|
import tech.powerjob.shade.io.vertx.core.Vertx;
|
||||||
|
import tech.powerjob.shade.io.vertx.core.VertxOptions;
|
||||||
|
import tech.powerjob.shade.io.vertx.core.http.*;
|
||||||
|
import tech.powerjob.shade.io.vertx.core.json.JsonObject;
|
||||||
|
import tech.powerjob.shade.io.vertx.ext.web.RequestBody;
|
||||||
|
import tech.powerjob.shade.io.vertx.ext.web.Router;
|
||||||
|
import tech.powerjob.shade.io.vertx.ext.web.handler.BodyHandler;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HTTP 代理服务
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2023/11/16
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class HttpProxyService implements ProxyService {
|
||||||
|
|
||||||
|
private Vertx _vertx;
|
||||||
|
private ProxyConfig proxyConfig;
|
||||||
|
|
||||||
|
private final Transporter transporter;
|
||||||
|
|
||||||
|
private static final String PROXY_PATH = "/proxy";
|
||||||
|
|
||||||
|
public HttpProxyService(Transporter transporter) {
|
||||||
|
this.transporter = transporter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SneakyThrows
|
||||||
|
public void initializeProxyServer(ProxyConfig proxyConfig) {
|
||||||
|
|
||||||
|
this.proxyConfig = proxyConfig;
|
||||||
|
if (proxyConfig == null || !proxyConfig.isEnableProxyServer()) {
|
||||||
|
log.info("[HttpProxyService] no proxy server config, skip initialize.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[HttpProxyService] start to initialize proxy server by proxy config: {}", proxyConfig);
|
||||||
|
|
||||||
|
HttpServerOptions httpServerOptions = new HttpServerOptions().setIdleTimeout(300);
|
||||||
|
HttpServer httpServer = vertx().createHttpServer(httpServerOptions);
|
||||||
|
Router router = Router.router(vertx());
|
||||||
|
router.route().handler(BodyHandler.create());
|
||||||
|
|
||||||
|
router.post(PROXY_PATH).blockingHandler(ctx -> {
|
||||||
|
final RequestBody body = ctx.body();
|
||||||
|
ProxyRequest proxyRequest = body.asPojo(ProxyRequest.class);
|
||||||
|
|
||||||
|
PowerSerializable ret = (PowerSerializable) JsonUtils.parseObjectUnsafe(proxyRequest.getRequest(), proxyRequest.getClz());
|
||||||
|
|
||||||
|
if (ProxyMethod.TELL.getV().equals(proxyRequest.getProxyMethod())) {
|
||||||
|
transporter.tell(proxyRequest.getUrl(), ret);
|
||||||
|
ctx.json(JsonObject.mapFrom(new ProxyResult().setSuccess(true)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ProxyResult proxyResult = new ProxyResult();
|
||||||
|
try {
|
||||||
|
CompletionStage<Object> proxyRequestStage = transporter.ask(proxyRequest.getUrl(), ret, Object.class);
|
||||||
|
Object originResult = proxyRequestStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||||
|
proxyResult.setSuccess(true).setData(JsonUtils.toJSONString(originResult));
|
||||||
|
} catch (Exception e) {
|
||||||
|
proxyResult.setSuccess(false).setMsg(ExceptionUtils.getMessage(e));
|
||||||
|
log.error("[HttpProxyService] proxy request failed!", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("[HttpProxyService] send proxy result: {}", proxyResult);
|
||||||
|
ctx.json(JsonObject.mapFrom(proxyResult));
|
||||||
|
});
|
||||||
|
|
||||||
|
Integer proxyServerPort = proxyConfig.getProxyServerPort();
|
||||||
|
|
||||||
|
httpServer.requestHandler(router)
|
||||||
|
.exceptionHandler(e -> log.error("[HttpProxyService] unknown exception in Actor communication!", e))
|
||||||
|
.listen(proxyServerPort)
|
||||||
|
.toCompletionStage()
|
||||||
|
.toCompletableFuture()
|
||||||
|
.get(1, TimeUnit.MINUTES);
|
||||||
|
|
||||||
|
log.info("[HttpProxyService] initialize proxy server in port: {}", proxyServerPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transporter warpProxyTransporter(ServerType currentServerType) {
|
||||||
|
|
||||||
|
if (proxyConfig.isUseProxy()) {
|
||||||
|
String proxyUrl = proxyConfig.getProxyUrl();
|
||||||
|
if (StringUtils.isEmpty(proxyUrl)) {
|
||||||
|
throw new IllegalArgumentException("when you use proxy, you must set the proxy url(ProxyConfig.proxyUrl)!");
|
||||||
|
}
|
||||||
|
log.info("[HttpProxyService] use proxy to visit other type node, proxy url: {}", proxyUrl);
|
||||||
|
return new ProxyTransporter(currentServerType);
|
||||||
|
}
|
||||||
|
return transporter;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletionStage<ProxyResult> sendProxyRequest(ProxyRequest proxyRequest) {
|
||||||
|
|
||||||
|
String fullUrl = String.format("%s/%s", proxyConfig.getProxyUrl(), PROXY_PATH);
|
||||||
|
|
||||||
|
HttpClient httpClient = vertx().createHttpClient();
|
||||||
|
RequestOptions requestOptions = new RequestOptions()
|
||||||
|
.setMethod(HttpMethod.POST)
|
||||||
|
.setAbsoluteURI(fullUrl);
|
||||||
|
// 转换 -> 发送请求获取响应
|
||||||
|
Future<HttpClientResponse> responseFuture = httpClient
|
||||||
|
.request(requestOptions)
|
||||||
|
.compose(httpClientRequest ->
|
||||||
|
httpClientRequest
|
||||||
|
.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
||||||
|
.send(JsonObject.mapFrom(proxyRequest).toBuffer())
|
||||||
|
);
|
||||||
|
return responseFuture.compose(httpClientResponse -> {
|
||||||
|
// throw exception
|
||||||
|
final int statusCode = httpClientResponse.statusCode();
|
||||||
|
if (statusCode != HttpResponseStatus.OK.code()) {
|
||||||
|
// CompletableFuture.get() 时会传递抛出该异常
|
||||||
|
throw new RemotingException(String.format("request [%s] failed, status: %d, msg: %s",
|
||||||
|
fullUrl, statusCode, httpClientResponse.statusMessage()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
return httpClientResponse.body().compose(x -> Future.succeededFuture(x.toJsonObject().mapTo(ProxyResult.class)));
|
||||||
|
})
|
||||||
|
.onFailure(t -> log.warn("[HttpProxyService] sendProxyRequest to url[{}] failed,msg: {}", fullUrl, ExceptionUtils.getMessage(t)))
|
||||||
|
.toCompletionStage();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Vertx vertx() {
|
||||||
|
|
||||||
|
if (_vertx != null) {
|
||||||
|
return _vertx;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized (this) {
|
||||||
|
if (_vertx == null) {
|
||||||
|
VertxOptions options = new VertxOptions().setWorkerPoolSize(32).setInternalBlockingPoolSize(32);
|
||||||
|
_vertx = Vertx.vertx(options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return _vertx;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ProxyTransporter implements Transporter {
|
||||||
|
|
||||||
|
private final ServerType myServerType;
|
||||||
|
|
||||||
|
public ProxyTransporter(ServerType myServerType) {
|
||||||
|
this.myServerType = myServerType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getProtocol() {
|
||||||
|
return transporter.getProtocol();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tell(URL url, PowerSerializable request) {
|
||||||
|
if (skipProxy(url)) {
|
||||||
|
transporter.tell(url, request);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.TELL.getV());
|
||||||
|
sendProxyRequest(proxyRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
|
||||||
|
if (skipProxy(url)) {
|
||||||
|
return transporter.ask(url, request, clz);
|
||||||
|
}
|
||||||
|
ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.ASK.getV());
|
||||||
|
CompletionStage<ProxyResult> proxyRequestCompletionStage = sendProxyRequest(proxyRequest);
|
||||||
|
return proxyRequestCompletionStage.thenApply(pr -> {
|
||||||
|
if (pr.isSuccess()) {
|
||||||
|
if (clz == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (clz.equals(String.class)) {
|
||||||
|
return (T) pr.getData();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return JsonUtils.parseObject(pr.getData(), clz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
ExceptionUtils.rethrow(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new RemotingException("proxy failed, msg: " + pr.getMsg());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean skipProxy(URL url) {
|
||||||
|
|
||||||
|
if (proxyConfig == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!proxyConfig.isUseProxy()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (StringUtils.isEmpty(proxyConfig.getProxyUrl())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 仅对向通讯需要使用代理
|
||||||
|
return Objects.equals(url.getServerType(), myServerType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,38 +0,0 @@
|
|||||||
package tech.powerjob.remote.framework.proxy;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.experimental.Accessors;
|
|
||||||
import tech.powerjob.common.PowerSerializable;
|
|
||||||
import tech.powerjob.remote.framework.base.URL;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 请求对象
|
|
||||||
*
|
|
||||||
* @author tjq
|
|
||||||
* @since 2023/11/15
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@Accessors(chain = true)
|
|
||||||
public class ProxyRequest implements Serializable {
|
|
||||||
|
|
||||||
public ProxyRequest() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 真正地访问地址
|
|
||||||
*/
|
|
||||||
private URL url;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 真正地请求数据
|
|
||||||
*/
|
|
||||||
private PowerSerializable request;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link ProxyMethod}
|
|
||||||
*/
|
|
||||||
private Integer proxyMethod;
|
|
||||||
|
|
||||||
}
|
|
@ -1,8 +1,8 @@
|
|||||||
package tech.powerjob.remote.framework.proxy;
|
package tech.powerjob.remote.framework.proxy;
|
||||||
|
|
||||||
import tech.powerjob.remote.framework.cs.ProxyConfig;
|
import tech.powerjob.remote.framework.base.ServerType;
|
||||||
|
import tech.powerjob.remote.framework.engine.config.ProxyConfig;
|
||||||
import java.util.concurrent.CompletionStage;
|
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 代理服务
|
* 代理服务
|
||||||
@ -16,12 +16,7 @@ public interface ProxyService {
|
|||||||
* 初始化
|
* 初始化
|
||||||
* @param proxyConfig 代理服务
|
* @param proxyConfig 代理服务
|
||||||
*/
|
*/
|
||||||
void initialize(ProxyConfig proxyConfig);
|
void initializeProxyServer(ProxyConfig proxyConfig);
|
||||||
|
|
||||||
/**
|
Transporter warpProxyTransporter(ServerType currentServerType);
|
||||||
* 代理请求
|
|
||||||
* @param proxyRequest 代理请求
|
|
||||||
* @return 代理响应
|
|
||||||
*/
|
|
||||||
CompletionStage<ProxyResult> sendProxyRequest(ProxyRequest proxyRequest);
|
|
||||||
}
|
}
|
||||||
|
@ -1,96 +0,0 @@
|
|||||||
package tech.powerjob.remote.framework.proxy;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
|
||||||
import tech.powerjob.common.PowerSerializable;
|
|
||||||
import tech.powerjob.common.serialize.JsonUtils;
|
|
||||||
import tech.powerjob.remote.framework.base.RemotingException;
|
|
||||||
import tech.powerjob.remote.framework.base.URL;
|
|
||||||
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
|
|
||||||
import tech.powerjob.remote.framework.cs.ProxyConfig;
|
|
||||||
import tech.powerjob.remote.framework.transporter.Protocol;
|
|
||||||
import tech.powerjob.remote.framework.transporter.Transporter;
|
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.CompletionStage;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 具有代理功能的通讯器
|
|
||||||
*
|
|
||||||
* @author tjq
|
|
||||||
* @since 2023/11/15
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class ProxyTransporter implements Transporter {
|
|
||||||
|
|
||||||
private final ProxyService proxyService;
|
|
||||||
private final Transporter realTransporter;
|
|
||||||
private final CSInitializerConfig csInitializerConfig;
|
|
||||||
|
|
||||||
public ProxyTransporter(ProxyService proxyService, Transporter realTransporter, CSInitializerConfig csInitializerConfig) {
|
|
||||||
this.proxyService = proxyService;
|
|
||||||
this.realTransporter = realTransporter;
|
|
||||||
this.csInitializerConfig = csInitializerConfig;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Protocol getProtocol() {
|
|
||||||
return realTransporter.getProtocol();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void tell(URL url, PowerSerializable request) {
|
|
||||||
if (skipProxy(url)) {
|
|
||||||
realTransporter.tell(url, request);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.TELL.getV());
|
|
||||||
proxyService.sendProxyRequest(proxyRequest);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
|
|
||||||
if (skipProxy(url)) {
|
|
||||||
return realTransporter.ask(url, request, clz);
|
|
||||||
}
|
|
||||||
ProxyRequest proxyRequest = new ProxyRequest().setUrl(url).setRequest(request).setProxyMethod(ProxyMethod.ASK.getV());
|
|
||||||
CompletionStage<ProxyResult> proxyRequestCompletionStage = proxyService.sendProxyRequest(proxyRequest);
|
|
||||||
return proxyRequestCompletionStage.thenApply(pr -> {
|
|
||||||
if (pr.isSuccess()) {
|
|
||||||
if (clz == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (clz.equals(String.class)) {
|
|
||||||
return (T) pr.getData();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return JsonUtils.parseObject(pr.getData(), clz);
|
|
||||||
} catch (Exception e) {
|
|
||||||
ExceptionUtils.rethrow(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new RemotingException("proxy failed, msg: " + pr.getMsg());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean skipProxy(URL url) {
|
|
||||||
|
|
||||||
ProxyConfig proxyConfig = csInitializerConfig.getProxyConfig();
|
|
||||||
if (proxyConfig == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (!proxyConfig.isUseProxy()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (StringUtils.isEmpty(proxyConfig.getProxyUrl())) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 仅对向通讯需要使用代理
|
|
||||||
return Objects.equals(url.getServerType(), csInitializerConfig.getServerType());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,4 +1,4 @@
|
|||||||
package tech.powerjob.remote.framework.proxy;
|
package tech.powerjob.remote.framework.proxy.module;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
@ -0,0 +1,67 @@
|
|||||||
|
package tech.powerjob.remote.framework.proxy.module;
|
||||||
|
|
||||||
|
import tech.powerjob.common.serialize.JsonUtils;
|
||||||
|
import tech.powerjob.remote.framework.base.URL;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 请求对象
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2023/11/15
|
||||||
|
*/
|
||||||
|
public class ProxyRequest implements Serializable {
|
||||||
|
|
||||||
|
public ProxyRequest() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 真正地访问地址
|
||||||
|
*/
|
||||||
|
private URL url;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 真正地请求数据
|
||||||
|
*/
|
||||||
|
private String request;
|
||||||
|
|
||||||
|
private Class<?> clz;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link ProxyMethod}
|
||||||
|
*/
|
||||||
|
private Integer proxyMethod;
|
||||||
|
|
||||||
|
public URL getUrl() {
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ProxyRequest setUrl(URL url) {
|
||||||
|
this.url = url;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRequest() {
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ProxyRequest setRequest(Object request) {
|
||||||
|
this.request = JsonUtils.toJSONString(request);
|
||||||
|
this.clz = request.getClass();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class<?> getClz() {
|
||||||
|
return clz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getProxyMethod() {
|
||||||
|
return proxyMethod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ProxyRequest setProxyMethod(Integer proxyMethod) {
|
||||||
|
this.proxyMethod = proxyMethod;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package tech.powerjob.remote.framework.proxy;
|
package tech.powerjob.remote.framework.proxy.module;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
@ -1,12 +1,10 @@
|
|||||||
package tech.powerjob.remote.framework.engine;
|
package tech.powerjob.remote.framework.engine;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import tech.powerjob.remote.framework.base.Address;
|
import tech.powerjob.remote.framework.base.Address;
|
||||||
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RemoteEngineTest
|
* RemoteEngineTest
|
||||||
*
|
*
|
||||||
|
@ -0,0 +1,58 @@
|
|||||||
|
package tech.powerjob.remote.framework.proxy;
|
||||||
|
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import tech.powerjob.common.response.AskResponse;
|
||||||
|
import tech.powerjob.remote.framework.base.URL;
|
||||||
|
import tech.powerjob.remote.framework.engine.config.ProxyConfig;
|
||||||
|
import tech.powerjob.remote.framework.proxy.module.ProxyMethod;
|
||||||
|
import tech.powerjob.remote.framework.proxy.module.ProxyRequest;
|
||||||
|
import tech.powerjob.remote.framework.proxy.module.ProxyResult;
|
||||||
|
import tech.powerjob.remote.framework.test.TestTransporter;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HttpProxyServiceTest
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2023/11/16
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
class HttpProxyServiceTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SneakyThrows
|
||||||
|
void testHttpProxyService() {
|
||||||
|
|
||||||
|
ProxyConfig proxyConfig = new ProxyConfig();
|
||||||
|
proxyConfig.setEnableProxyServer(true);
|
||||||
|
|
||||||
|
proxyConfig.setProxyUrl("http://127.0.0.1:9999");
|
||||||
|
proxyConfig.setUseProxy(true);
|
||||||
|
|
||||||
|
|
||||||
|
HttpProxyService httpProxyService = new HttpProxyService(new TestTransporter());
|
||||||
|
|
||||||
|
httpProxyService.initializeProxyServer(proxyConfig);
|
||||||
|
|
||||||
|
URL url = new URL();
|
||||||
|
AskResponse askResponse = new AskResponse();
|
||||||
|
askResponse.setMessage("from test");
|
||||||
|
|
||||||
|
ProxyRequest tellProxyRequest = new ProxyRequest().setUrl(url).setRequest(askResponse).setProxyMethod(ProxyMethod.TELL.getV());
|
||||||
|
CompletionStage<ProxyResult> tellProxyResultCompletionStage = httpProxyService.sendProxyRequest(tellProxyRequest);
|
||||||
|
ProxyResult tellProxyResult = tellProxyResultCompletionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||||
|
log.info("[HttpProxyServiceTest] tellProxyResult: {}", tellProxyResult);
|
||||||
|
|
||||||
|
ProxyRequest askProxyRequest = new ProxyRequest().setUrl(url).setRequest(askResponse).setProxyMethod(ProxyMethod.ASK.getV());
|
||||||
|
CompletionStage<ProxyResult> askProxyResultCompletionStage = httpProxyService.sendProxyRequest(askProxyRequest);
|
||||||
|
ProxyResult askProxyResult = askProxyResultCompletionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||||
|
log.info("[HttpProxyServiceTest] askProxyResult: {}", askProxyResult);
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,41 @@
|
|||||||
|
package tech.powerjob.remote.framework.test;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import tech.powerjob.common.PowerSerializable;
|
||||||
|
import tech.powerjob.common.response.AskResponse;
|
||||||
|
import tech.powerjob.remote.framework.base.RemotingException;
|
||||||
|
import tech.powerjob.remote.framework.base.URL;
|
||||||
|
import tech.powerjob.remote.framework.transporter.Protocol;
|
||||||
|
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TestTransporter
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2023/11/16
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class TestTransporter implements Transporter {
|
||||||
|
@Override
|
||||||
|
public Protocol getProtocol() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tell(URL url, PowerSerializable request) {
|
||||||
|
log.info("[TestTransporter] invoke tell, url: {}, request: {}", url, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
|
||||||
|
log.info("[TestTransporter] invoke ask, url: {}, request: {}", url, request);
|
||||||
|
AskResponse askResponse = new AskResponse();
|
||||||
|
askResponse.setSuccess(true);
|
||||||
|
askResponse.setMessage("FromTestTransporter, force success");
|
||||||
|
return (CompletionStage<T>) CompletableFuture.completedFuture(askResponse);
|
||||||
|
}
|
||||||
|
}
|
@ -9,7 +9,7 @@ import tech.powerjob.remote.framework.BenchmarkActor;
|
|||||||
import tech.powerjob.remote.framework.base.Address;
|
import tech.powerjob.remote.framework.base.Address;
|
||||||
import tech.powerjob.remote.framework.base.HandlerLocation;
|
import tech.powerjob.remote.framework.base.HandlerLocation;
|
||||||
import tech.powerjob.remote.framework.base.URL;
|
import tech.powerjob.remote.framework.base.URL;
|
||||||
import tech.powerjob.remote.framework.engine.EngineConfig;
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
import tech.powerjob.remote.framework.engine.EngineOutput;
|
import tech.powerjob.remote.framework.engine.EngineOutput;
|
||||||
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
||||||
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
||||||
|
@ -22,7 +22,7 @@ import tech.powerjob.remote.framework.base.Address;
|
|||||||
import tech.powerjob.remote.framework.base.RemotingException;
|
import tech.powerjob.remote.framework.base.RemotingException;
|
||||||
import tech.powerjob.remote.framework.base.ServerType;
|
import tech.powerjob.remote.framework.base.ServerType;
|
||||||
import tech.powerjob.remote.framework.base.URL;
|
import tech.powerjob.remote.framework.base.URL;
|
||||||
import tech.powerjob.remote.framework.engine.EngineConfig;
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
import tech.powerjob.remote.framework.engine.EngineOutput;
|
import tech.powerjob.remote.framework.engine.EngineOutput;
|
||||||
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
||||||
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
||||||
|
@ -10,7 +10,7 @@ import tech.powerjob.common.utils.NetUtils;
|
|||||||
import tech.powerjob.common.utils.PropertyUtils;
|
import tech.powerjob.common.utils.PropertyUtils;
|
||||||
import tech.powerjob.remote.framework.base.Address;
|
import tech.powerjob.remote.framework.base.Address;
|
||||||
import tech.powerjob.remote.framework.base.ServerType;
|
import tech.powerjob.remote.framework.base.ServerType;
|
||||||
import tech.powerjob.remote.framework.engine.EngineConfig;
|
import tech.powerjob.remote.framework.engine.config.EngineConfig;
|
||||||
import tech.powerjob.remote.framework.engine.EngineOutput;
|
import tech.powerjob.remote.framework.engine.EngineOutput;
|
||||||
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
||||||
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user