From 7318fed73aa3e52af41b8340cee01704daa06cf0 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 15 Jul 2023 21:38:56 +0800 Subject: [PATCH] feat: support non-LAN communication(worker side) --- .../tech/powerjob/common/PowerJobDKey.java | 9 +++++- .../powerjob/common/utils/PropertyUtils.java | 32 +++++++++++++++++++ .../remote/framework/base/Address.java | 6 +++- .../remote/transporter/ProtocolInfo.java | 22 +++++++++++-- .../impl/PowerTransportService.java | 2 +- .../tech/powerjob/worker/PowerJobWorker.java | 16 +++++++--- 6 files changed, 77 insertions(+), 10 deletions(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/utils/PropertyUtils.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index aa27ea48..92c3a353 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -5,7 +5,6 @@ import java.net.NetworkInterface; /** * 通过 JVM 启动参数传入的配置信息 * - * * @author tjq * @since 2020/8/8 */ @@ -16,7 +15,15 @@ public class PowerJobDKey { */ public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred"; + /** + * 绑定地址,一般填写本机网卡地址 + */ public static final String BIND_LOCAL_ADDRESS = "powerjob.network.local.address"; + /** + * 外部地址,可选,默认与绑定地址相同。当存在 NAT 等场景时可通过单独传递外部地址来实现通讯 + */ + public static final String NT_EXTERNAL_ADDRESS = "powerjob.network.external.address"; + public static final String NT_EXTERNAL_PORT = "powerjob.network.external.port"; /** * Java regular expressions for network interfaces that will be ignored. diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/PropertyUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/PropertyUtils.java new file mode 100644 index 00000000..c1458586 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/PropertyUtils.java @@ -0,0 +1,32 @@ +package tech.powerjob.common.utils; + +import org.apache.commons.lang3.StringUtils; + +/** + * PropertyUtils + * + * @author tjq + * @since 2023/7/15 + */ +public class PropertyUtils { + + public static String readProperty(String key, String defaultValue) { + // 从启动参数读取 + String property = System.getProperty(key); + if (StringUtils.isNotEmpty(property)) { + return property; + } + + // 从 ENV 读取 + property= System.getenv(key); + if (StringUtils.isNotEmpty(property)) { + return property; + } + // 部分操作系统不兼容 a.b.c 的环境变量,转换为 a_b_c 再取一次,即 PowerJob 支持 2 种类型的环境变量 key + property = System.getenv(key.replaceAll("\\.", "_")); + if (StringUtils.isNotEmpty(property)) { + return property; + } + return defaultValue; + } +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java index 5541f111..873b9eeb 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java @@ -20,7 +20,7 @@ public class Address implements Serializable { private int port; public String toFullAddress() { - return String.format("%s:%d", host, port); + return toFullAddress(host, port); } public static Address fromIpv4(String ipv4) { @@ -30,6 +30,10 @@ public class Address implements Serializable { .setPort(Integer.parseInt(split[1])); } + public static String toFullAddress(String host, int port) { + return String.format("%s:%d", host, port); + } + @Override public String toString() { return toFullAddress(); diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java index 0802e811..c27d939c 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java @@ -3,6 +3,9 @@ package tech.powerjob.server.remote.transporter; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import tech.powerjob.common.PowerJobDKey; +import tech.powerjob.common.utils.PropertyUtils; +import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.transporter.Transporter; /** @@ -20,11 +23,26 @@ public class ProtocolInfo { private String address; + /** + * 外部地址,当存在 NAT 等场景时,需要下发该地址到 worker + */ + private String externalAddress; + private transient Transporter transporter; - public ProtocolInfo(String protocol, String address, Transporter transporter) { + public ProtocolInfo(String protocol, String host, int port, Transporter transporter) { this.protocol = protocol; - this.address = address; this.transporter = transporter; + + this.address = Address.toFullAddress(host, port); + + // 处理外部地址 + String externalAddress = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, host); + + // 考虑到不同协议 port 理论上不一样,server 需要为每个单独的端口配置映射,规则为 powerjob.network.external.port.${协议},比如 powerjob.network.external.port.http + String externalPortByProtocolKey = PowerJobDKey.NT_EXTERNAL_PORT.concat(".").concat(protocol.toLowerCase()); + // 大部分用户只使用一种协议,在此处做兼容处理降低答疑量和提高易用性(如果用户有多种协议,只有被转发的协议能成功通讯) + String externalPort = PropertyUtils.readProperty(externalPortByProtocolKey, PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(port))); + this.externalAddress = Address.toFullAddress(externalAddress, Integer.parseInt(externalPort)); } } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java index d5d2f624..331a83ad 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java @@ -120,7 +120,7 @@ public class PowerTransportService implements TransportService, InitializingBean log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); this.engines.add(re); - this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter())); + this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.getHost(), address.getPort(), engineOutput.getTransporter())); } @Override diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 61ea7271..bbe682d2 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -3,12 +3,14 @@ package tech.powerjob.worker; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.HttpUtils; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.PropertyUtils; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.engine.EngineConfig; @@ -79,9 +81,13 @@ public class PowerJobWorker { log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env."); } - // 初始化元数据 - String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort(); - workerRuntime.setWorkerAddress(workerAddress); + // 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址) + String localBindIp = NetUtils.getLocalHost(); + int localBindPort = config.getPort(); + String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp); + String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort)); + log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort); + workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort))); // 初始化 线程池 final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig()); @@ -100,7 +106,7 @@ public class PowerJobWorker { EngineConfig engineConfig = new EngineConfig() .setType(config.getProtocol().name()) .setServerType(ServerType.WORKER) - .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort())) + .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)) .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); EngineOutput engineOutput = remoteEngine.start(engineConfig); @@ -115,7 +121,7 @@ public class PowerJobWorker { log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully."); // 初始化日志系统 - OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService); + OmsLogHandler omsLogHandler = new OmsLogHandler(workerRuntime.getWorkerAddress(), workerRuntime.getTransporter(), serverDiscoveryService); workerRuntime.setOmsLogHandler(omsLogHandler); // 初始化存储