diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java index a17370d8..b6cc0e14 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java @@ -77,6 +77,10 @@ public class NetUtils { * @return 本机 IP 地址 */ public static String getLocalHost() { + return getLocalHostWithNetworkInterfaceChecker(null); + } + + public static String getLocalHostWithNetworkInterfaceChecker(NetworkInterfaceChecker networkInterfaceChecker) { if (HOST_ADDRESS != null) { return HOST_ADDRESS; } @@ -87,7 +91,7 @@ public class NetUtils { return HOST_ADDRESS = addressFromJVM; } - InetAddress address = getLocalAddress(); + InetAddress address = getLocalAddress(networkInterfaceChecker); if (address != null) { return HOST_ADDRESS = address.getHostAddress(); } @@ -107,19 +111,19 @@ public class NetUtils { * * @return first valid local IP */ - public static InetAddress getLocalAddress() { + public static InetAddress getLocalAddress(NetworkInterfaceChecker networkInterfaceChecker) { if (LOCAL_ADDRESS != null) { return LOCAL_ADDRESS; } - InetAddress localAddress = getLocalAddress0(); + InetAddress localAddress = getLocalAddress0(networkInterfaceChecker); LOCAL_ADDRESS = localAddress; return localAddress; } - private static InetAddress getLocalAddress0() { + private static InetAddress getLocalAddress0(NetworkInterfaceChecker networkInterfaceChecker) { // @since 2.7.6, choose the {@link NetworkInterface} first try { - InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface()); + InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface(networkInterfaceChecker)); if (addressOp != null) { return addressOp; } @@ -169,7 +173,7 @@ public class NetUtils { * @return If no {@link NetworkInterface} is available , return null * @since 2.7.6 */ - public static NetworkInterface findNetworkInterface() { + public static NetworkInterface findNetworkInterface(NetworkInterfaceChecker networkInterfaceChecker) { List validNetworkInterfaces = emptyList(); try { @@ -184,7 +188,11 @@ public class NetUtils { // Try to find the preferred one for (NetworkInterface networkInterface : validNetworkInterfaces) { if (isPreferredNetworkInterface(networkInterface)) { - log.info("[Net] use preferred network interface: {}", networkInterface.getDisplayName()); + log.info("[Net] use preferred network interface: {}", networkInterface); + return networkInterface; + } + if (isPassedCheckNetworkInterface(networkInterface, networkInterfaceChecker)) { + log.info("[Net] use PassedCheckNetworkInterface: {}", networkInterface); return networkInterface; } } @@ -199,6 +207,25 @@ public class NetUtils { return first(validNetworkInterfaces); } + /** + * 通过用户方法判断是否为目标网卡 + * @param networkInterface networkInterface + * @param networkInterfaceChecker 判断方法 + * @return true or false + */ + static boolean isPassedCheckNetworkInterface(NetworkInterface networkInterface, NetworkInterfaceChecker networkInterfaceChecker) { + if (networkInterfaceChecker == null) { + return false; + } + log.info("[Net] try to choose NetworkInterface by NetworkInterfaceChecker, current NetworkInterface: {}", networkInterface); + try { + return networkInterfaceChecker.ok(networkInterface, getFirstReachableInetAddress(networkInterface)); + } catch (Exception e) { + log.warn("[Net] isPassedCheckerNetworkInterface failed, current networkInterface: {}", networkInterface, e); + } + return false; + } + private static Optional toValidAddress(InetAddress address) { if (address instanceof Inet6Address) { Inet6Address v6Address = (Inet6Address) address; @@ -352,4 +379,9 @@ public class NetUtils { } return false; } + + @FunctionalInterface + public interface NetworkInterfaceChecker { + boolean ok(NetworkInterface networkInterface, InetAddress inetAddress); + } } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java new file mode 100644 index 00000000..53e4f0c3 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java @@ -0,0 +1,14 @@ +package tech.powerjob.common.utils.net; + +import java.io.Closeable; + +/** + * socket 服务器,用于进行连通性测试 + * + * @author tjq + * @since 2024/2/8 + */ +public interface PingPongServer extends Closeable { + + void initialize(int port) throws Exception; +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java new file mode 100644 index 00000000..21090bfa --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java @@ -0,0 +1,57 @@ +package tech.powerjob.common.utils.net; + +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.utils.CommonUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; + +/** + * 简易服务器 + * + * @author tjq + * @since 2024/2/8 + */ +@Slf4j +public class PingPongSocketServer implements PingPongServer { + + private Thread thread; + + private ServerSocket serverSocket; + + private transient boolean terminated = false; + + @Override + public void initialize(int port) throws Exception{ + serverSocket = new ServerSocket(port); + + thread = new Thread(() -> { + while (true) { + if (terminated) { + return; + } + // 接收连接,如果没有连接,accept() 方法会阻塞 + try (Socket socket = serverSocket.accept();OutputStream outputStream = socket.getOutputStream();) { + outputStream.write(PingPongUtils.PONG.getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + } catch (Exception e) { + if (!terminated) { + log.warn("[PingPongSocketServer] process accepted socket failed!", e); + } + } + } + }, "PingPongSocketServer-Thread"); + + thread.start(); + } + + @Override + public void close() throws IOException { + terminated = true; + CommonUtils.executeIgnoreException(() -> serverSocket.close()); + thread.interrupt(); + } +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java new file mode 100644 index 00000000..ee547254 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java @@ -0,0 +1,53 @@ +package tech.powerjob.common.utils.net; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import java.io.*; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; + +/** + * socket 连通性助手 + * + * @author tjq + * @since 2024/2/8 + */ +@Slf4j +public class PingPongUtils { + + static final String PING = "ping"; + static final String PONG = "pong"; + + /** + * 验证目标 IP 和 端口的连通性 + * @param targetIp 目标 IP + * @param targetPort 目标端口 + * @return true or false + */ + public static boolean checkConnectivity(String targetIp, int targetPort) { + + try (Socket s = new Socket(targetIp, targetPort);InputStream is = s.getInputStream();OutputStream os = s.getOutputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + + // 发送 PING 请求 + os.write(PING.getBytes(StandardCharsets.UTF_8)); + os.flush(); + + //读取服务器返回的消息 + String content = br.readLine(); + + if (PONG.equalsIgnoreCase(content)) { + return true; + } + } catch (UnknownHostException e) { + log.warn("[SocketConnectivityUtils] unknown host: {}:{}", targetIp, targetPort); + } catch (IOException e) { + log.warn("[SocketConnectivityUtils] IOException: {}:{}, msg: {}", targetIp, targetPort, ExceptionUtils.getMessage(e)); + } catch (Exception e) { + log.error("[SocketConnectivityUtils] unknown exception for check ip: {}:{}", targetIp, targetPort, e); + } + + return false; + } +} diff --git a/powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java b/powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java new file mode 100644 index 00000000..0d559111 --- /dev/null +++ b/powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java @@ -0,0 +1,31 @@ +package tech.powerjob.common.utils.net; + +import org.junit.jupiter.api.Test; +import tech.powerjob.common.utils.NetUtils; + +/** + * desc + * + * @author tjq + * @since 2024/2/8 + */ +class PingPongSocketServerTest { + + @Test + void test() throws Exception { + + int port = 8877; + + PingPongSocketServer pingPongSocketServer = new PingPongSocketServer(); + pingPongSocketServer.initialize(port); + + System.out.println("[PingPongSocketServerTest] finished initialize"); + + assert PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port); + + assert !PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port + 1); + + pingPongSocketServer.close(); + System.out.println("[PingPongSocketServerTest] finished close"); + } +} \ No newline at end of file diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java index 60eaebd6..c476e162 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java @@ -11,7 +11,7 @@ import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.request.ServerDiscoveryRequest; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.net.PingPongUtils; import tech.powerjob.server.common.aware.ServerInfoAware; import tech.powerjob.server.common.module.ServerInfo; import tech.powerjob.server.persistence.remote.model.AppInfoDO; @@ -66,6 +66,17 @@ public class ServerController implements ServerInfoAware { return ResultDTO.success(serverElectionService.elect(request)); } + @GetMapping("/checkConnectivity") + public ResultDTO checkConnectivity(String targetIp, Integer targetPort) { + try { + boolean ret = PingPongUtils.checkConnectivity(targetIp, targetPort); + return ResultDTO.success(ret); + } catch (Throwable t) { + return ResultDTO.failed(t); + } + } + + @GetMapping("/hello") public ResultDTO ping(@RequestParam(required = false) boolean debug) { JSONObject res = new JSONObject(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index eceac34c..404da4ca 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -6,7 +6,6 @@ import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.PropertyUtils; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.ServerType; @@ -24,6 +23,7 @@ import tech.powerjob.worker.background.discovery.ServerDiscoveryService; import tech.powerjob.worker.common.PowerBannerPrinter; import tech.powerjob.worker.common.PowerJobWorkerConfig; import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.common.utils.WorkerNetUtils; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.extension.processor.ProcessorFactory; import tech.powerjob.worker.persistence.TaskPersistenceService; @@ -74,13 +74,16 @@ public class PowerJobWorker { try { PowerBannerPrinter.print(); + + // 在发第一个请求之前,完成真正 IP 的解析 + int localBindPort = config.getPort(); + String localBindIp = WorkerNetUtils.parseLocalBindIp(localBindPort, config.getServerAddress()); + // 校验 appName WorkerAppInfo appInfo = serverDiscoveryService.assertApp(); workerRuntime.setAppInfo(appInfo); // 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址) - String localBindIp = NetUtils.getLocalHost(); - int localBindPort = config.getPort(); String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp); String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort)); log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java new file mode 100644 index 00000000..8d9d0a31 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java @@ -0,0 +1,76 @@ +package tech.powerjob.worker.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.response.ResultDTO; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.HttpUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.net.PingPongServer; +import tech.powerjob.common.utils.net.PingPongSocketServer; + +import java.util.List; + +/** + * PowerJob Worker 专用的网络工具类 + * + * @author tjq + * @since 2024/2/8 + */ +@Slf4j +public class WorkerNetUtils { + + private static final String SERVER_CONNECTIVITY_CHECK_URL_PATTERN = "http://%s/server/checkConnectivity?targetIp=%s&targetPort=%d"; + + /** + * 多网卡情况下,解析可与 server 通讯的本地 IP 地址 + * @param port 目标端口 + * @param serverAddress server 服务地址 + * @return 本机IP + */ + public static String parseLocalBindIp(int port, List serverAddress) { + PingPongServer pingPongServer = null; + + try { + pingPongServer = new PingPongSocketServer(); + pingPongServer.initialize(port); + log.info("[WorkerNetUtils] initialize PingPongSocketServer successfully~"); + } catch (Exception e) { + log.warn("[WorkerNetUtils] PingPongSocketServer failed to start, which may result in an incorrectly bound IP, please pay attention to the initialize log.", e); + } + + String localHostWithNetworkInterfaceChecker = NetUtils.getLocalHostWithNetworkInterfaceChecker(((networkInterface, inetAddress) -> { + + if (inetAddress == null) { + return false; + } + + String workerIp = inetAddress.getHostAddress(); + for (String address : serverAddress) { + String url = String.format(SERVER_CONNECTIVITY_CHECK_URL_PATTERN, address, workerIp, port); + try { + String resp = HttpUtils.get(url); + log.info("[WorkerNetUtils] check connectivity by url[{}], response: {}", url, resp); + if (StringUtils.isNotEmpty(resp)) { + ResultDTO resultDTO = JsonUtils.parseObject(resp, ResultDTO.class); + return Boolean.TRUE.toString().equalsIgnoreCase(String.valueOf(resultDTO.getData())); + } + } catch (Exception ignore) { + } + } + return false; + })); + + if (pingPongServer != null) { + try { + pingPongServer.close(); + log.info("[WorkerNetUtils] close PingPongSocketServer successfully~"); + } catch (Exception e) { + log.warn("[WorkerNetUtils] close PingPongSocketServer failed!", e); + } + } + + return localHostWithNetworkInterfaceChecker; + } + +}