feat: server and worker suit the proxy api

This commit is contained in:
tjq 2023-12-01 19:55:35 +08:00
parent 703a36e6e3
commit 0a2826391e
4 changed files with 27 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.config.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.config.ProxyConfig;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.ProtocolInfo;
import tech.powerjob.server.remote.transporter.TransportService; import tech.powerjob.server.remote.transporter.TransportService;
@ -55,6 +56,15 @@ public class PowerTransportService implements TransportService, InitializingBean
@Value("${oms.transporter.main.protocol}") @Value("${oms.transporter.main.protocol}")
private String mainProtocol; private String mainProtocol;
/**
* 是否发布代理服务 & 代理服务的端口
* 用于 server worker 跨网段但可互通的场景 server K8S 集群内worker 在虚拟机集群内worker 通过域名可连接 server但无法根据 IP 直连使用代理即可完成 worker server 的访问
*/
@Value("${oms.transporter.proxy.enable}")
private boolean enableProxyServer;
@Value("${oms.transporter.proxy.server-port}")
private int proxyServerPort;
private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port"; private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port";
private final Environment environment; private final Environment environment;
@ -109,10 +119,14 @@ public class PowerTransportService implements TransportService, InitializingBean
Address address = new Address() Address address = new Address()
.setHost(NetUtils.getLocalHost()) .setHost(NetUtils.getLocalHost())
.setPort(port); .setPort(port);
ProxyConfig proxyConfig = new ProxyConfig()
.setEnableProxyServer(enableProxyServer)
.setProxyServerPort(proxyServerPort);
EngineConfig engineConfig = new EngineConfig() EngineConfig engineConfig = new EngineConfig()
.setServerType(ServerType.SERVER) .setServerType(ServerType.SERVER)
.setType(protocol.toUpperCase()) .setType(protocol.toUpperCase())
.setBindAddress(address) .setBindAddress(address)
.setProxyConfig(proxyConfig)
.setActorList(Lists.newArrayList(beansWithAnnotation.values())); .setActorList(Lists.newArrayList(beansWithAnnotation.values()));
log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address); log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address);
RemoteEngine re = new PowerJobRemoteEngine(); RemoteEngine re = new PowerJobRemoteEngine();

View File

@ -16,6 +16,8 @@ spring.servlet.multipart.max-request-size=209715200
###### PowerJob transporter configuration ###### ###### PowerJob transporter configuration ######
oms.transporter.active.protocols=AKKA,HTTP oms.transporter.active.protocols=AKKA,HTTP
oms.transporter.main.protocol=HTTP oms.transporter.main.protocol=HTTP
oms.transporter.proxy.enable=false
oms.transporter.proxy.server-port=9999
oms.akka.port=10086 oms.akka.port=10086
oms.http.port=10010 oms.http.port=10010
# Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_ # Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_

View File

@ -3,6 +3,7 @@ package tech.powerjob.worker;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
@ -13,6 +14,7 @@ import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.engine.config.EngineConfig; import tech.powerjob.remote.framework.engine.config.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.config.ProxyConfig;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.worker.actors.ProcessorTrackerActor; import tech.powerjob.worker.actors.ProcessorTrackerActor;
import tech.powerjob.worker.actors.TaskTrackerActor; import tech.powerjob.worker.actors.TaskTrackerActor;
@ -105,6 +107,11 @@ public class PowerJobWorker {
.setServerType(ServerType.WORKER) .setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)) .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
if (StringUtils.isNotEmpty(config.getServerProxyAddress())) {
ProxyConfig proxyConfig = new ProxyConfig().setUseProxy(true).setProxyUrl(config.getServerProxyAddress());
engineConfig.setProxyConfig(proxyConfig);
log.info("[PowerJobWorker] active proxy by config, proxy config: {}", proxyConfig);
}
EngineOutput engineOutput = remoteEngine.start(engineConfig); EngineOutput engineOutput = remoteEngine.start(engineConfig);
workerRuntime.setTransporter(engineOutput.getTransporter()); workerRuntime.setTransporter(engineOutput.getTransporter());

View File

@ -88,5 +88,8 @@ public class PowerJobWorkerConfig {
* Interval(s) of worker health report * Interval(s) of worker health report
*/ */
private Integer healthReportInterval = 10; private Integer healthReportInterval = 10;
/**
* server proxy address
*/
private String serverProxyAddress;
} }