feat: support non-LAN communication(worker side)

This commit is contained in:
tjq 2023-07-15 21:38:56 +08:00
parent 67a22e8b7e
commit 7318fed73a
6 changed files with 77 additions and 10 deletions

View File

@ -5,7 +5,6 @@ import java.net.NetworkInterface;
/**
* 通过 JVM 启动参数传入的配置信息
*
*
* @author tjq
* @since 2020/8/8
*/
@ -16,7 +15,15 @@ public class PowerJobDKey {
*/
public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred";
/**
* 绑定地址一般填写本机网卡地址
*/
public static final String BIND_LOCAL_ADDRESS = "powerjob.network.local.address";
/**
* 外部地址可选默认与绑定地址相同当存在 NAT 等场景时可通过单独传递外部地址来实现通讯
*/
public static final String NT_EXTERNAL_ADDRESS = "powerjob.network.external.address";
public static final String NT_EXTERNAL_PORT = "powerjob.network.external.port";
/**
* Java regular expressions for network interfaces that will be ignored.

View File

@ -0,0 +1,32 @@
package tech.powerjob.common.utils;
import org.apache.commons.lang3.StringUtils;
/**
* PropertyUtils
*
* @author tjq
* @since 2023/7/15
*/
public class PropertyUtils {
public static String readProperty(String key, String defaultValue) {
// 从启动参数读取
String property = System.getProperty(key);
if (StringUtils.isNotEmpty(property)) {
return property;
}
// ENV 读取
property= System.getenv(key);
if (StringUtils.isNotEmpty(property)) {
return property;
}
// 部分操作系统不兼容 a.b.c 的环境变量转换为 a_b_c 再取一次 PowerJob 支持 2 种类型的环境变量 key
property = System.getenv(key.replaceAll("\\.", "_"));
if (StringUtils.isNotEmpty(property)) {
return property;
}
return defaultValue;
}
}

View File

@ -20,7 +20,7 @@ public class Address implements Serializable {
private int port;
public String toFullAddress() {
return String.format("%s:%d", host, port);
return toFullAddress(host, port);
}
public static Address fromIpv4(String ipv4) {
@ -30,6 +30,10 @@ public class Address implements Serializable {
.setPort(Integer.parseInt(split[1]));
}
public static String toFullAddress(String host, int port) {
return String.format("%s:%d", host, port);
}
@Override
public String toString() {
return toFullAddress();

View File

@ -3,6 +3,9 @@ package tech.powerjob.server.remote.transporter;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.transporter.Transporter;
/**
@ -20,11 +23,26 @@ public class ProtocolInfo {
private String address;
/**
* 外部地址当存在 NAT 等场景时需要下发该地址到 worker
*/
private String externalAddress;
private transient Transporter transporter;
public ProtocolInfo(String protocol, String address, Transporter transporter) {
public ProtocolInfo(String protocol, String host, int port, Transporter transporter) {
this.protocol = protocol;
this.address = address;
this.transporter = transporter;
this.address = Address.toFullAddress(host, port);
// 处理外部地址
String externalAddress = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, host);
// 考虑到不同协议 port 理论上不一样server 需要为每个单独的端口配置映射规则为 powerjob.network.external.port.${协议}比如 powerjob.network.external.port.http
String externalPortByProtocolKey = PowerJobDKey.NT_EXTERNAL_PORT.concat(".").concat(protocol.toLowerCase());
// 大部分用户只使用一种协议在此处做兼容处理降低答疑量和提高易用性如果用户有多种协议只有被转发的协议能成功通讯
String externalPort = PropertyUtils.readProperty(externalPortByProtocolKey, PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(port)));
this.externalAddress = Address.toFullAddress(externalAddress, Integer.parseInt(externalPort));
}
}

View File

@ -120,7 +120,7 @@ public class PowerTransportService implements TransportService, InitializingBean
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);
this.engines.add(re);
this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.getHost(), address.getPort(), engineOutput.getTransporter()));
}
@Override

View File

@ -3,12 +3,14 @@ package tech.powerjob.worker;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.engine.EngineConfig;
@ -79,9 +81,13 @@ public class PowerJobWorker {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
// 初始化元数据
String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
workerRuntime.setWorkerAddress(workerAddress);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String localBindIp = NetUtils.getLocalHost();
int localBindPort = config.getPort();
String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);
String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));
log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);
workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort)));
// 初始化 线程池
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
@ -100,7 +106,7 @@ public class PowerJobWorker {
EngineConfig engineConfig = new EngineConfig()
.setType(config.getProtocol().name())
.setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
.setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
EngineOutput engineOutput = remoteEngine.start(engineConfig);
@ -115,7 +121,7 @@ public class PowerJobWorker {
log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
// 初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
OmsLogHandler omsLogHandler = new OmsLogHandler(workerRuntime.getWorkerAddress(), workerRuntime.getTransporter(), serverDiscoveryService);
workerRuntime.setOmsLogHandler(omsLogHandler);
// 初始化存储