diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java index a64051fd..0546e263 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java @@ -19,7 +19,14 @@ import java.io.Serializable; @Accessors(chain = true) public class CSInitializerConfig implements Serializable { + /** + * 需要绑定的地址(本地) + */ private Address bindAddress; + /** + * 外部地址(需要 NAT 等情况存在) + */ + private Address externalAddress; private ServerType serverType; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java index a9b82961..d038114c 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java @@ -30,6 +30,10 @@ public class EngineConfig implements Serializable { * 绑定的本地地址 */ private Address bindAddress; + /** + * 外部地址(需要 NAT 等情况存在) + */ + private Address externalAddress; /** * actor实例,交由使用侧自己实例化以便自行注入各种 bean */ diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index 2f9cbc6b..0c0e5cbb 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -41,6 +41,7 @@ public class PowerJobRemoteEngine implements RemoteEngine { csInitializer.init(new CSInitializerConfig() .setBindAddress(engineConfig.getBindAddress()) + .setExternalAddress(engineConfig.getExternalAddress()) .setServerType(engineConfig.getServerType()) ); diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java index cad05a8f..f00a0b29 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java @@ -9,6 +9,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.base.Address; @@ -47,8 +48,22 @@ public class AkkaCSInitializer implements CSInitializer { // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了) Map overrideConfig = Maps.newHashMap(); - overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost()); - overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort()); + + Address externalAddress = config.getExternalAddress(); + + if (externalAddress == null || StringUtils.equalsIgnoreCase(externalAddress.toFullAddress(), bindAddress.toFullAddress())) { + overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost()); + overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort()); + log.info("[PowerJob-AKKA] not exist externalIp, overrideConfig: {}", overrideConfig); + } else { + overrideConfig.put("akka.remote.artery.canonical.hostname", externalAddress.getHost()); + overrideConfig.put("akka.remote.artery.canonical.port", externalAddress.getPort()); + + overrideConfig.put("akka.remote.artery.bind.hostname", "0.0.0.0"); + overrideConfig.put("akka.remote.artery.bind.port", bindAddress.getPort()); + + log.info("[PowerJob-AKKA] exist externalAddress[{}], final overrideConfig: {}", externalAddress, overrideConfig); + } Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java index 331a83ad..53748862 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java @@ -104,23 +104,35 @@ public class PowerTransportService implements TransportService, InitializingBean // 从构造器注入改为从 applicationContext 获取来避免循环依赖 final Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class); - log.info("[PowerTransportService] find Actor num={},names={}", beansWithAnnotation.size(), beansWithAnnotation.keySet()); + log.info("[PowerTransportService] [{}] find Actor num={},names={}", protocol, beansWithAnnotation.size(), beansWithAnnotation.keySet()); Address address = new Address() .setHost(NetUtils.getLocalHost()) .setPort(port); + + ProtocolInfo protocolInfo = new ProtocolInfo(protocol, address.getHost(), address.getPort(), null); + EngineConfig engineConfig = new EngineConfig() .setServerType(ServerType.SERVER) .setType(protocol.toUpperCase()) .setBindAddress(address) .setActorList(Lists.newArrayList(beansWithAnnotation.values())); - log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address); + + if (!StringUtils.equalsIgnoreCase(protocolInfo.getExternalAddress(), protocolInfo.getAddress())) { + Address externalAddress = Address.fromIpv4(protocolInfo.getExternalAddress()); + engineConfig.setExternalAddress(externalAddress); + log.info("[PowerTransportService] [{}] exist externalAddress: {}", protocol, externalAddress); + } + + log.info("[PowerTransportService] [{}] start to initialize RemoteEngine[address={}]", protocol, address); RemoteEngine re = new PowerJobRemoteEngine(); final EngineOutput engineOutput = re.start(engineConfig); - log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); + log.info("[PowerTransportService] [{}] start RemoteEngine[address={}] successfully", protocol, address); this.engines.add(re); - this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.getHost(), address.getPort(), engineOutput.getTransporter())); + + protocolInfo.setTransporter(engineOutput.getTransporter()); + this.protocolName2Info.put(protocol, protocolInfo); } @Override diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 053ad4c2..7d6e0e61 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -3,6 +3,7 @@ package tech.powerjob.worker; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.utils.CommonUtils; @@ -85,10 +86,10 @@ public class PowerJobWorker { workerRuntime.setAppInfo(appInfo); // 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址) - String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp); + String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, null); String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort)); log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort); - workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort))); + workerRuntime.setWorkerAddress(Address.toFullAddress(Optional.ofNullable(externalIp).orElse(localBindIp), Integer.parseInt(externalPort))); // 初始化 线程池 final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig()); @@ -110,6 +111,12 @@ public class PowerJobWorker { .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)) .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); + if (StringUtils.isNotEmpty(externalIp)) { + Address externalAddress = new Address().setHost(externalIp).setPort(Integer.parseInt(externalPort)); + engineConfig.setExternalAddress(externalAddress); + log.info("[PowerJobWorker] [ADDRESS_INFO] exist externalIp, add external address to engine config: {}", externalAddress); + } + EngineOutput engineOutput = remoteEngine.start(engineConfig); workerRuntime.setTransporter(engineOutput.getTransporter());