fix: some bug of HttpProxyService

This commit is contained in:
tjq 2023-12-01 22:07:39 +08:00
parent f0ff32ccc4
commit 77f9233819
9 changed files with 84 additions and 8 deletions

View File

@ -1,5 +1,6 @@
package tech.powerjob.remote.framework.proxy;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -28,6 +29,7 @@ import tech.powerjob.shade.io.vertx.ext.web.Router;
import tech.powerjob.shade.io.vertx.ext.web.handler.BodyHandler;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -47,6 +49,8 @@ public class HttpProxyService implements ProxyService {
private static final String PROXY_PATH = "/proxy";
private static final Set<Integer> INITIALIZED_PORTS = Sets.newHashSet();
public HttpProxyService(Transporter transporter) {
this.transporter = transporter;
}
@ -61,6 +65,14 @@ public class HttpProxyService implements ProxyService {
return;
}
// server 存在多套协议时会初始化多次 proxy-server同一个端口只需要初始化一次
Integer proxyServerPort = proxyConfig.getProxyServerPort();
if (INITIALIZED_PORTS.contains(proxyServerPort)) {
log.info("[HttpProxyService] this port already publish the proxy service, skip publish!");
return;
}
INITIALIZED_PORTS.add(proxyServerPort);
log.info("[HttpProxyService] start to initialize proxy server by proxy config: {}", proxyConfig);
HttpServerOptions httpServerOptions = new HttpServerOptions().setIdleTimeout(300);
@ -72,6 +84,10 @@ public class HttpProxyService implements ProxyService {
final RequestBody body = ctx.body();
ProxyRequest proxyRequest = body.asPojo(ProxyRequest.class);
if (log.isDebugEnabled()) {
log.debug("[HttpProxyService] receive proxy request: {}", proxyRequest);
}
PowerSerializable ret = (PowerSerializable) JsonUtils.parseObjectUnsafe(proxyRequest.getRequest(), proxyRequest.getClz());
if (ProxyMethod.TELL.getV().equals(proxyRequest.getProxyMethod())) {
@ -90,14 +106,15 @@ public class HttpProxyService implements ProxyService {
log.error("[HttpProxyService] proxy request failed!", e);
}
log.debug("[HttpProxyService] send proxy result: {}", proxyResult);
if (log.isDebugEnabled()) {
log.debug("[HttpProxyService] send proxy result: {}", proxyResult);
}
ctx.json(JsonObject.mapFrom(proxyResult));
});
Integer proxyServerPort = proxyConfig.getProxyServerPort();
httpServer.requestHandler(router)
.exceptionHandler(e -> log.error("[HttpProxyService] unknown exception in Actor communication!", e))
.exceptionHandler(e -> log.error("[HttpProxyService] unknown exception in HttpProxyService!", e))
.listen(proxyServerPort)
.toCompletionStage()
.toCompletableFuture()
@ -109,6 +126,10 @@ public class HttpProxyService implements ProxyService {
@Override
public Transporter warpProxyTransporter(ServerType currentServerType) {
if (proxyConfig == null) {
return transporter;
}
if (proxyConfig.isUseProxy()) {
String proxyUrl = proxyConfig.getProxyUrl();
if (StringUtils.isEmpty(proxyUrl)) {
@ -122,7 +143,11 @@ public class HttpProxyService implements ProxyService {
CompletionStage<ProxyResult> sendProxyRequest(ProxyRequest proxyRequest) {
String fullUrl = String.format("%s/%s", proxyConfig.getProxyUrl(), PROXY_PATH);
String fullUrl = String.format("%s%s", fixUrl(proxyConfig.getProxyUrl()), PROXY_PATH);
if (log.isDebugEnabled()) {
log.debug("[ProxyTransporter] send proxy request, url: {}, request: {}", fullUrl, proxyRequest);
}
HttpClient httpClient = vertx().createHttpClient();
RequestOptions requestOptions = new RequestOptions()
@ -236,4 +261,14 @@ public class HttpProxyService implements ProxyService {
}
}
static String fixUrl(String url) {
if (url.endsWith("/")) {
url = url.substring(0, url.length() - 1);
}
if (url.startsWith("http")) {
return url;
}
return "http://".concat(url);
}
}

View File

@ -64,4 +64,9 @@ public class ProxyRequest implements Serializable {
this.proxyMethod = proxyMethod;
return this;
}
@Override
public String toString() {
return JsonUtils.toJSONString(this);
}
}

View File

@ -55,4 +55,12 @@ class HttpProxyServiceTest {
Thread.sleep(1000);
}
@Test
void testFixUrl() {
String correctUrl = "http://www.taobao.com";
assert HttpProxyService.fixUrl("http://www.taobao.com").equals(correctUrl);
assert HttpProxyService.fixUrl("http://www.taobao.com/").equals(correctUrl);
assert HttpProxyService.fixUrl("www.taobao.com/").equals(correctUrl);
}
}

View File

@ -94,7 +94,7 @@ public class HttpVertxCSInitializer implements CSInitializer {
final String host = config.getBindAddress().getHost();
httpServer.requestHandler(router)
.exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
.exceptionHandler(e -> log.error("[PowerJob] unknown exception in HttpVertx communication!", e))
.listen(port, host)
.toCompletionStage()
.toCompletableFuture()

View File

@ -0,0 +1,24 @@
server.port=8082
spring.jpa.open-in-view=false
########### PowerJob-worker properties. ###########
# Whether to enable PowerJob Worker, default is true
powerjob.worker.enabled=true
# Turn on test mode and do not force the server connection to be verified
powerjob.worker.allow-lazy-connect-server=false
# Transport port, default is 27777
powerjob.worker.port=27778
# Application name, used for grouping applications. Recommend to set the same value as project name.
powerjob.worker.app-name=powerjob-worker-samples
# Address of PowerJob-server node(s). Ip:port or domain. Multiple addresses should be separated with comma.
powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# transport protocol between server and worker
powerjob.worker.protocol=http
# Store strategy of H2 database. disk or memory. Default value is disk.
powerjob.worker.store-strategy=disk
# Max length of result. Results that are longer than the value will be truncated.
powerjob.worker.max-result-length=4096
# Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignore.
powerjob.worker.max-appended-wf-context-length=4096
# test proxy server
powerjob.worker.proxy-server-address=http://127.0.0.1:9999

View File

@ -85,6 +85,8 @@ public class PowerJobAutoConfiguration {
config.setMaxLightweightTaskNum(worker.getMaxLightweightTaskNum());
config.setHealthReportInterval(worker.getHealthReportInterval());
config.setProxyServerAddress(worker.getProxyServerAddress());
/*
* Create PowerJobSpringWorker object and set properties.
*/

View File

@ -170,5 +170,7 @@ public class PowerJobProperties {
*/
private Integer healthReportInterval = 10;
private String proxyServerAddress;
}
}

View File

@ -107,8 +107,8 @@ public class PowerJobWorker {
.setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
if (StringUtils.isNotEmpty(config.getServerProxyAddress())) {
ProxyConfig proxyConfig = new ProxyConfig().setUseProxy(true).setProxyUrl(config.getServerProxyAddress());
if (StringUtils.isNotEmpty(config.getProxyServerAddress())) {
ProxyConfig proxyConfig = new ProxyConfig().setUseProxy(true).setProxyUrl(config.getProxyServerAddress());
engineConfig.setProxyConfig(proxyConfig);
log.info("[PowerJobWorker] active proxy by config, proxy config: {}", proxyConfig);
}

View File

@ -91,5 +91,5 @@ public class PowerJobWorkerConfig {
/**
* server proxy address
*/
private String serverProxyAddress;
private String proxyServerAddress;
}