From 0a2826391e5cb27bff42c4303b89a55114b3a6fd Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Dec 2023 19:55:35 +0800 Subject: [PATCH] feat: server and worker suit the proxy api --- .../transporter/impl/PowerTransportService.java | 14 ++++++++++++++ .../src/main/resources/application.properties | 2 ++ .../java/tech/powerjob/worker/PowerJobWorker.java | 7 +++++++ .../worker/common/PowerJobWorkerConfig.java | 5 ++++- 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java index 7f106558..8481c5e6 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java @@ -25,6 +25,7 @@ import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; +import tech.powerjob.remote.framework.engine.config.ProxyConfig; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.TransportService; @@ -55,6 +56,15 @@ public class PowerTransportService implements TransportService, InitializingBean @Value("${oms.transporter.main.protocol}") private String mainProtocol; + /** + * 是否发布代理服务 & 代理服务的端口 + * 用于 server 和 worker 跨网段但可互通的场景,如 server 在 K8S 集群内,worker 在虚拟机集群内。worker 通过域名可连接 server,但无法根据 IP 直连。使用代理即可完成 worker 对 server 的访问 + */ + @Value("${oms.transporter.proxy.enable}") + private boolean enableProxyServer; + @Value("${oms.transporter.proxy.server-port}") + private int proxyServerPort; + private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port"; private final Environment environment; @@ -109,10 +119,14 @@ public class PowerTransportService implements TransportService, InitializingBean Address address = new Address() .setHost(NetUtils.getLocalHost()) .setPort(port); + ProxyConfig proxyConfig = new ProxyConfig() + .setEnableProxyServer(enableProxyServer) + .setProxyServerPort(proxyServerPort); EngineConfig engineConfig = new EngineConfig() .setServerType(ServerType.SERVER) .setType(protocol.toUpperCase()) .setBindAddress(address) + .setProxyConfig(proxyConfig) .setActorList(Lists.newArrayList(beansWithAnnotation.values())); log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address); RemoteEngine re = new PowerJobRemoteEngine(); diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/application.properties b/powerjob-server/powerjob-server-starter/src/main/resources/application.properties index 1b74735b..8e4a4838 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/application.properties +++ b/powerjob-server/powerjob-server-starter/src/main/resources/application.properties @@ -16,6 +16,8 @@ spring.servlet.multipart.max-request-size=209715200 ###### PowerJob transporter configuration ###### oms.transporter.active.protocols=AKKA,HTTP oms.transporter.main.protocol=HTTP +oms.transporter.proxy.enable=false +oms.transporter.proxy.server-port=9999 oms.akka.port=10086 oms.http.port=10010 # Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 6712f0b6..6ca075c3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -3,6 +3,7 @@ package tech.powerjob.worker; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.utils.CommonUtils; @@ -13,6 +14,7 @@ import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; +import tech.powerjob.remote.framework.engine.config.ProxyConfig; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.worker.actors.ProcessorTrackerActor; import tech.powerjob.worker.actors.TaskTrackerActor; @@ -105,6 +107,11 @@ public class PowerJobWorker { .setServerType(ServerType.WORKER) .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)) .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); + if (StringUtils.isNotEmpty(config.getServerProxyAddress())) { + ProxyConfig proxyConfig = new ProxyConfig().setUseProxy(true).setProxyUrl(config.getServerProxyAddress()); + engineConfig.setProxyConfig(proxyConfig); + log.info("[PowerJobWorker] active proxy by config, proxy config: {}", proxyConfig); + } EngineOutput engineOutput = remoteEngine.start(engineConfig); workerRuntime.setTransporter(engineOutput.getTransporter()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java index cb34853b..8d9b718e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java @@ -88,5 +88,8 @@ public class PowerJobWorkerConfig { * Interval(s) of worker health report */ private Integer healthReportInterval = 10; - + /** + * server proxy address + */ + private String serverProxyAddress; }