fix: AKKA NAT BUG #929

This commit is contained in:
tjq 2024-08-11 00:20:34 +08:00
parent 6426424401
commit a1dad6c39e
6 changed files with 54 additions and 8 deletions

View File

@ -19,7 +19,14 @@ import java.io.Serializable;
@Accessors(chain = true) @Accessors(chain = true)
public class CSInitializerConfig implements Serializable { public class CSInitializerConfig implements Serializable {
/**
* 需要绑定的地址本地
*/
private Address bindAddress; private Address bindAddress;
/**
* 外部地址需要 NAT 等情况存在
*/
private Address externalAddress;
private ServerType serverType; private ServerType serverType;
} }

View File

@ -30,6 +30,10 @@ public class EngineConfig implements Serializable {
* 绑定的本地地址 * 绑定的本地地址
*/ */
private Address bindAddress; private Address bindAddress;
/**
* 外部地址需要 NAT 等情况存在
*/
private Address externalAddress;
/** /**
* actor实例交由使用侧自己实例化以便自行注入各种 bean * actor实例交由使用侧自己实例化以便自行注入各种 bean
*/ */

View File

@ -41,6 +41,7 @@ public class PowerJobRemoteEngine implements RemoteEngine {
csInitializer.init(new CSInitializerConfig() csInitializer.init(new CSInitializerConfig()
.setBindAddress(engineConfig.getBindAddress()) .setBindAddress(engineConfig.getBindAddress())
.setExternalAddress(engineConfig.getExternalAddress())
.setServerType(engineConfig.getServerType()) .setServerType(engineConfig.getServerType())
); );

View File

@ -9,6 +9,7 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.Address;
@ -47,8 +48,22 @@ public class AkkaCSInitializer implements CSInitializer {
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了 // 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了
Map<String, Object> overrideConfig = Maps.newHashMap(); Map<String, Object> overrideConfig = Maps.newHashMap();
overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort()); Address externalAddress = config.getExternalAddress();
if (externalAddress == null || StringUtils.equalsIgnoreCase(externalAddress.toFullAddress(), bindAddress.toFullAddress())) {
overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
log.info("[PowerJob-AKKA] not exist externalIp, overrideConfig: {}", overrideConfig);
} else {
overrideConfig.put("akka.remote.artery.canonical.hostname", externalAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", externalAddress.getPort());
overrideConfig.put("akka.remote.artery.bind.hostname", "0.0.0.0");
overrideConfig.put("akka.remote.artery.bind.port", bindAddress.getPort());
log.info("[PowerJob-AKKA] exist externalAddress[{}], final overrideConfig: {}", externalAddress, overrideConfig);
}
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG); Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);

View File

@ -104,23 +104,35 @@ public class PowerTransportService implements TransportService, InitializingBean
// 从构造器注入改为从 applicationContext 获取来避免循环依赖 // 从构造器注入改为从 applicationContext 获取来避免循环依赖
final Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class); final Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class);
log.info("[PowerTransportService] find Actor num={},names={}", beansWithAnnotation.size(), beansWithAnnotation.keySet()); log.info("[PowerTransportService] [{}] find Actor num={},names={}", protocol, beansWithAnnotation.size(), beansWithAnnotation.keySet());
Address address = new Address() Address address = new Address()
.setHost(NetUtils.getLocalHost()) .setHost(NetUtils.getLocalHost())
.setPort(port); .setPort(port);
ProtocolInfo protocolInfo = new ProtocolInfo(protocol, address.getHost(), address.getPort(), null);
EngineConfig engineConfig = new EngineConfig() EngineConfig engineConfig = new EngineConfig()
.setServerType(ServerType.SERVER) .setServerType(ServerType.SERVER)
.setType(protocol.toUpperCase()) .setType(protocol.toUpperCase())
.setBindAddress(address) .setBindAddress(address)
.setActorList(Lists.newArrayList(beansWithAnnotation.values())); .setActorList(Lists.newArrayList(beansWithAnnotation.values()));
log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address);
if (!StringUtils.equalsIgnoreCase(protocolInfo.getExternalAddress(), protocolInfo.getAddress())) {
Address externalAddress = Address.fromIpv4(protocolInfo.getExternalAddress());
engineConfig.setExternalAddress(externalAddress);
log.info("[PowerTransportService] [{}] exist externalAddress: {}", protocol, externalAddress);
}
log.info("[PowerTransportService] [{}] start to initialize RemoteEngine[address={}]", protocol, address);
RemoteEngine re = new PowerJobRemoteEngine(); RemoteEngine re = new PowerJobRemoteEngine();
final EngineOutput engineOutput = re.start(engineConfig); final EngineOutput engineOutput = re.start(engineConfig);
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); log.info("[PowerTransportService] [{}] start RemoteEngine[address={}] successfully", protocol, address);
this.engines.add(re); this.engines.add(re);
this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.getHost(), address.getPort(), engineOutput.getTransporter()));
protocolInfo.setTransporter(engineOutput.getTransporter());
this.protocolName2Info.put(protocol, protocolInfo);
} }
@Override @Override

View File

@ -3,6 +3,7 @@ package tech.powerjob.worker;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
@ -85,10 +86,10 @@ public class PowerJobWorker {
workerRuntime.setAppInfo(appInfo); workerRuntime.setAppInfo(appInfo);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址 // 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp); String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, null);
String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort)); String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));
log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort); log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);
workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort))); workerRuntime.setWorkerAddress(Address.toFullAddress(Optional.ofNullable(externalIp).orElse(localBindIp), Integer.parseInt(externalPort)));
// 初始化 线程池 // 初始化 线程池
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig()); final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
@ -110,6 +111,12 @@ public class PowerJobWorker {
.setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)) .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
if (StringUtils.isNotEmpty(externalIp)) {
Address externalAddress = new Address().setHost(externalIp).setPort(Integer.parseInt(externalPort));
engineConfig.setExternalAddress(externalAddress);
log.info("[PowerJobWorker] [ADDRESS_INFO] exist externalIp, add external address to engine config: {}", externalAddress);
}
EngineOutput engineOutput = remoteEngine.start(engineConfig); EngineOutput engineOutput = remoteEngine.start(engineConfig);
workerRuntime.setTransporter(engineOutput.getTransporter()); workerRuntime.setTransporter(engineOutput.getTransporter());