From 77f923381928b24153ae4ecbe841d057ec136a7e Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Dec 2023 22:07:39 +0800 Subject: [PATCH] fix: some bug of HttpProxyService --- .../framework/proxy/HttpProxyService.java | 43 +++++++++++++++++-- .../framework/proxy/module/ProxyRequest.java | 5 +++ .../framework/proxy/HttpProxyServiceTest.java | 8 ++++ .../remote/http/HttpVertxCSInitializer.java | 2 +- .../resources/application-test2.properties | 24 +++++++++++ .../PowerJobAutoConfiguration.java | 2 + .../autoconfigure/PowerJobProperties.java | 2 + .../tech/powerjob/worker/PowerJobWorker.java | 4 +- .../worker/common/PowerJobWorkerConfig.java | 2 +- 9 files changed, 84 insertions(+), 8 deletions(-) create mode 100644 powerjob-worker-samples/src/main/resources/application-test2.properties diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java index 9d88f7da..d5d02117 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/HttpProxyService.java @@ -1,5 +1,6 @@ package tech.powerjob.remote.framework.proxy; +import com.google.common.collect.Sets; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -28,6 +29,7 @@ import tech.powerjob.shade.io.vertx.ext.web.Router; import tech.powerjob.shade.io.vertx.ext.web.handler.BodyHandler; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -47,6 +49,8 @@ public class HttpProxyService implements ProxyService { private static final String PROXY_PATH = "/proxy"; + private static final Set INITIALIZED_PORTS = Sets.newHashSet(); + public HttpProxyService(Transporter transporter) { this.transporter = transporter; } @@ -61,6 +65,14 @@ public class HttpProxyService implements ProxyService { return; } + // server 存在多套协议时,会初始化多次 proxy-server,同一个端口只需要初始化一次 + Integer proxyServerPort = proxyConfig.getProxyServerPort(); + if (INITIALIZED_PORTS.contains(proxyServerPort)) { + log.info("[HttpProxyService] this port already publish the proxy service, skip publish!"); + return; + } + + INITIALIZED_PORTS.add(proxyServerPort); log.info("[HttpProxyService] start to initialize proxy server by proxy config: {}", proxyConfig); HttpServerOptions httpServerOptions = new HttpServerOptions().setIdleTimeout(300); @@ -72,6 +84,10 @@ public class HttpProxyService implements ProxyService { final RequestBody body = ctx.body(); ProxyRequest proxyRequest = body.asPojo(ProxyRequest.class); + if (log.isDebugEnabled()) { + log.debug("[HttpProxyService] receive proxy request: {}", proxyRequest); + } + PowerSerializable ret = (PowerSerializable) JsonUtils.parseObjectUnsafe(proxyRequest.getRequest(), proxyRequest.getClz()); if (ProxyMethod.TELL.getV().equals(proxyRequest.getProxyMethod())) { @@ -90,14 +106,15 @@ public class HttpProxyService implements ProxyService { log.error("[HttpProxyService] proxy request failed!", e); } - log.debug("[HttpProxyService] send proxy result: {}", proxyResult); + if (log.isDebugEnabled()) { + log.debug("[HttpProxyService] send proxy result: {}", proxyResult); + } ctx.json(JsonObject.mapFrom(proxyResult)); }); - Integer proxyServerPort = proxyConfig.getProxyServerPort(); httpServer.requestHandler(router) - .exceptionHandler(e -> log.error("[HttpProxyService] unknown exception in Actor communication!", e)) + .exceptionHandler(e -> log.error("[HttpProxyService] unknown exception in HttpProxyService!", e)) .listen(proxyServerPort) .toCompletionStage() .toCompletableFuture() @@ -109,6 +126,10 @@ public class HttpProxyService implements ProxyService { @Override public Transporter warpProxyTransporter(ServerType currentServerType) { + if (proxyConfig == null) { + return transporter; + } + if (proxyConfig.isUseProxy()) { String proxyUrl = proxyConfig.getProxyUrl(); if (StringUtils.isEmpty(proxyUrl)) { @@ -122,7 +143,11 @@ public class HttpProxyService implements ProxyService { CompletionStage sendProxyRequest(ProxyRequest proxyRequest) { - String fullUrl = String.format("%s/%s", proxyConfig.getProxyUrl(), PROXY_PATH); + String fullUrl = String.format("%s%s", fixUrl(proxyConfig.getProxyUrl()), PROXY_PATH); + + if (log.isDebugEnabled()) { + log.debug("[ProxyTransporter] send proxy request, url: {}, request: {}", fullUrl, proxyRequest); + } HttpClient httpClient = vertx().createHttpClient(); RequestOptions requestOptions = new RequestOptions() @@ -236,4 +261,14 @@ public class HttpProxyService implements ProxyService { } } + static String fixUrl(String url) { + if (url.endsWith("/")) { + url = url.substring(0, url.length() - 1); + } + if (url.startsWith("http")) { + return url; + } + return "http://".concat(url); + } + } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java index 6d8fadad..881703d2 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/proxy/module/ProxyRequest.java @@ -64,4 +64,9 @@ public class ProxyRequest implements Serializable { this.proxyMethod = proxyMethod; return this; } + + @Override + public String toString() { + return JsonUtils.toJSONString(this); + } } diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java index 59fe08eb..ed83ad0a 100644 --- a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/proxy/HttpProxyServiceTest.java @@ -55,4 +55,12 @@ class HttpProxyServiceTest { Thread.sleep(1000); } + @Test + void testFixUrl() { + String correctUrl = "http://www.taobao.com"; + assert HttpProxyService.fixUrl("http://www.taobao.com").equals(correctUrl); + assert HttpProxyService.fixUrl("http://www.taobao.com/").equals(correctUrl); + assert HttpProxyService.fixUrl("www.taobao.com/").equals(correctUrl); + } + } \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java index 6edb8381..12c48f99 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java @@ -94,7 +94,7 @@ public class HttpVertxCSInitializer implements CSInitializer { final String host = config.getBindAddress().getHost(); httpServer.requestHandler(router) - .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e)) + .exceptionHandler(e -> log.error("[PowerJob] unknown exception in HttpVertx communication!", e)) .listen(port, host) .toCompletionStage() .toCompletableFuture() diff --git a/powerjob-worker-samples/src/main/resources/application-test2.properties b/powerjob-worker-samples/src/main/resources/application-test2.properties new file mode 100644 index 00000000..c2c05902 --- /dev/null +++ b/powerjob-worker-samples/src/main/resources/application-test2.properties @@ -0,0 +1,24 @@ +server.port=8082 +spring.jpa.open-in-view=false +########### PowerJob-worker properties. ########### +# Whether to enable PowerJob Worker, default is true +powerjob.worker.enabled=true +# Turn on test mode and do not force the server connection to be verified +powerjob.worker.allow-lazy-connect-server=false +# Transport port, default is 27777 +powerjob.worker.port=27778 +# Application name, used for grouping applications. Recommend to set the same value as project name. +powerjob.worker.app-name=powerjob-worker-samples +# Address of PowerJob-server node(s). Ip:port or domain. Multiple addresses should be separated with comma. +powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701 +# transport protocol between server and worker +powerjob.worker.protocol=http +# Store strategy of H2 database. disk or memory. Default value is disk. +powerjob.worker.store-strategy=disk +# Max length of result. Results that are longer than the value will be truncated. +powerjob.worker.max-result-length=4096 +# Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignore. +powerjob.worker.max-appended-wf-context-length=4096 + +# test proxy server +powerjob.worker.proxy-server-address=http://127.0.0.1:9999 \ No newline at end of file diff --git a/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java index 2687a281..1c9ad807 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java @@ -85,6 +85,8 @@ public class PowerJobAutoConfiguration { config.setMaxLightweightTaskNum(worker.getMaxLightweightTaskNum()); config.setHealthReportInterval(worker.getHealthReportInterval()); + + config.setProxyServerAddress(worker.getProxyServerAddress()); /* * Create PowerJobSpringWorker object and set properties. */ diff --git a/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobProperties.java b/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobProperties.java index fe054e1e..8d9939e7 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobProperties.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobProperties.java @@ -170,5 +170,7 @@ public class PowerJobProperties { */ private Integer healthReportInterval = 10; + private String proxyServerAddress; + } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 6ca075c3..1a4c19fa 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -107,8 +107,8 @@ public class PowerJobWorker { .setServerType(ServerType.WORKER) .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)) .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); - if (StringUtils.isNotEmpty(config.getServerProxyAddress())) { - ProxyConfig proxyConfig = new ProxyConfig().setUseProxy(true).setProxyUrl(config.getServerProxyAddress()); + if (StringUtils.isNotEmpty(config.getProxyServerAddress())) { + ProxyConfig proxyConfig = new ProxyConfig().setUseProxy(true).setProxyUrl(config.getProxyServerAddress()); engineConfig.setProxyConfig(proxyConfig); log.info("[PowerJobWorker] active proxy by config, proxy config: {}", proxyConfig); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java index 8d9b718e..b95da978 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java @@ -91,5 +91,5 @@ public class PowerJobWorkerConfig { /** * server proxy address */ - private String serverProxyAddress; + private String proxyServerAddress; }