feat: Optimizing IP acquisition logic with PingPongSocketServer #762

This commit is contained in:
tjq 2024-02-08 19:41:40 +08:00
parent 61aecc6354
commit b29e265e42
8 changed files with 288 additions and 11 deletions

View File

@ -77,6 +77,10 @@ public class NetUtils {
* @return 本机 IP 地址
*/
public static String getLocalHost() {
return getLocalHostWithNetworkInterfaceChecker(null);
}
public static String getLocalHostWithNetworkInterfaceChecker(NetworkInterfaceChecker networkInterfaceChecker) {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
@ -87,7 +91,7 @@ public class NetUtils {
return HOST_ADDRESS = addressFromJVM;
}
InetAddress address = getLocalAddress();
InetAddress address = getLocalAddress(networkInterfaceChecker);
if (address != null) {
return HOST_ADDRESS = address.getHostAddress();
}
@ -107,19 +111,19 @@ public class NetUtils {
*
* @return first valid local IP
*/
public static InetAddress getLocalAddress() {
public static InetAddress getLocalAddress(NetworkInterfaceChecker networkInterfaceChecker) {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
InetAddress localAddress = getLocalAddress0(networkInterfaceChecker);
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static InetAddress getLocalAddress0() {
private static InetAddress getLocalAddress0(NetworkInterfaceChecker networkInterfaceChecker) {
// @since 2.7.6, choose the {@link NetworkInterface} first
try {
InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface());
InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface(networkInterfaceChecker));
if (addressOp != null) {
return addressOp;
}
@ -169,7 +173,7 @@ public class NetUtils {
* @return If no {@link NetworkInterface} is available , return <code>null</code>
* @since 2.7.6
*/
public static NetworkInterface findNetworkInterface() {
public static NetworkInterface findNetworkInterface(NetworkInterfaceChecker networkInterfaceChecker) {
List<NetworkInterface> validNetworkInterfaces = emptyList();
try {
@ -184,7 +188,11 @@ public class NetUtils {
// Try to find the preferred one
for (NetworkInterface networkInterface : validNetworkInterfaces) {
if (isPreferredNetworkInterface(networkInterface)) {
log.info("[Net] use preferred network interface: {}", networkInterface.getDisplayName());
log.info("[Net] use preferred network interface: {}", networkInterface);
return networkInterface;
}
if (isPassedCheckNetworkInterface(networkInterface, networkInterfaceChecker)) {
log.info("[Net] use PassedCheckNetworkInterface: {}", networkInterface);
return networkInterface;
}
}
@ -199,6 +207,25 @@ public class NetUtils {
return first(validNetworkInterfaces);
}
/**
* 通过用户方法判断是否为目标网卡
* @param networkInterface networkInterface
* @param networkInterfaceChecker 判断方法
* @return true or false
*/
static boolean isPassedCheckNetworkInterface(NetworkInterface networkInterface, NetworkInterfaceChecker networkInterfaceChecker) {
if (networkInterfaceChecker == null) {
return false;
}
log.info("[Net] try to choose NetworkInterface by NetworkInterfaceChecker, current NetworkInterface: {}", networkInterface);
try {
return networkInterfaceChecker.ok(networkInterface, getFirstReachableInetAddress(networkInterface));
} catch (Exception e) {
log.warn("[Net] isPassedCheckerNetworkInterface failed, current networkInterface: {}", networkInterface, e);
}
return false;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
@ -352,4 +379,9 @@ public class NetUtils {
}
return false;
}
@FunctionalInterface
public interface NetworkInterfaceChecker {
boolean ok(NetworkInterface networkInterface, InetAddress inetAddress);
}
}

View File

@ -0,0 +1,14 @@
package tech.powerjob.common.utils.net;
import java.io.Closeable;
/**
* socket 服务器用于进行连通性测试
*
* @author tjq
* @since 2024/2/8
*/
public interface PingPongServer extends Closeable {
void initialize(int port) throws Exception;
}

View File

@ -0,0 +1,57 @@
package tech.powerjob.common.utils.net;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.utils.CommonUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
* 简易服务器
*
* @author tjq
* @since 2024/2/8
*/
@Slf4j
public class PingPongSocketServer implements PingPongServer {
private Thread thread;
private ServerSocket serverSocket;
private transient boolean terminated = false;
@Override
public void initialize(int port) throws Exception{
serverSocket = new ServerSocket(port);
thread = new Thread(() -> {
while (true) {
if (terminated) {
return;
}
// 接收连接如果没有连接accept() 方法会阻塞
try (Socket socket = serverSocket.accept();OutputStream outputStream = socket.getOutputStream();) {
outputStream.write(PingPongUtils.PONG.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
} catch (Exception e) {
if (!terminated) {
log.warn("[PingPongSocketServer] process accepted socket failed!", e);
}
}
}
}, "PingPongSocketServer-Thread");
thread.start();
}
@Override
public void close() throws IOException {
terminated = true;
CommonUtils.executeIgnoreException(() -> serverSocket.close());
thread.interrupt();
}
}

View File

@ -0,0 +1,53 @@
package tech.powerjob.common.utils.net;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.*;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
/**
* socket 连通性助手
*
* @author tjq
* @since 2024/2/8
*/
@Slf4j
public class PingPongUtils {
static final String PING = "ping";
static final String PONG = "pong";
/**
* 验证目标 IP 端口的连通性
* @param targetIp 目标 IP
* @param targetPort 目标端口
* @return true or false
*/
public static boolean checkConnectivity(String targetIp, int targetPort) {
try (Socket s = new Socket(targetIp, targetPort);InputStream is = s.getInputStream();OutputStream os = s.getOutputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
// 发送 PING 请求
os.write(PING.getBytes(StandardCharsets.UTF_8));
os.flush();
//读取服务器返回的消息
String content = br.readLine();
if (PONG.equalsIgnoreCase(content)) {
return true;
}
} catch (UnknownHostException e) {
log.warn("[SocketConnectivityUtils] unknown host: {}:{}", targetIp, targetPort);
} catch (IOException e) {
log.warn("[SocketConnectivityUtils] IOException: {}:{}, msg: {}", targetIp, targetPort, ExceptionUtils.getMessage(e));
} catch (Exception e) {
log.error("[SocketConnectivityUtils] unknown exception for check ip: {}:{}", targetIp, targetPort, e);
}
return false;
}
}

View File

@ -0,0 +1,31 @@
package tech.powerjob.common.utils.net;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.utils.NetUtils;
/**
* desc
*
* @author tjq
* @since 2024/2/8
*/
class PingPongSocketServerTest {
@Test
void test() throws Exception {
int port = 8877;
PingPongSocketServer pingPongSocketServer = new PingPongSocketServer();
pingPongSocketServer.initialize(port);
System.out.println("[PingPongSocketServerTest] finished initialize");
assert PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port);
assert !PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port + 1);
pingPongSocketServer.close();
System.out.println("[PingPongSocketServerTest] finished close");
}
}

View File

@ -11,7 +11,7 @@ import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.net.PingPongUtils;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
@ -66,6 +66,17 @@ public class ServerController implements ServerInfoAware {
return ResultDTO.success(serverElectionService.elect(request));
}
@GetMapping("/checkConnectivity")
public ResultDTO<Boolean> checkConnectivity(String targetIp, Integer targetPort) {
try {
boolean ret = PingPongUtils.checkConnectivity(targetIp, targetPort);
return ResultDTO.success(ret);
} catch (Throwable t) {
return ResultDTO.failed(t);
}
}
@GetMapping("/hello")
public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
JSONObject res = new JSONObject();

View File

@ -6,7 +6,6 @@ import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
@ -24,6 +23,7 @@ import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.WorkerNetUtils;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import tech.powerjob.worker.persistence.TaskPersistenceService;
@ -74,13 +74,16 @@ public class PowerJobWorker {
try {
PowerBannerPrinter.print();
// 在发第一个请求之前完成真正 IP 的解析
int localBindPort = config.getPort();
String localBindIp = WorkerNetUtils.parseLocalBindIp(localBindPort, config.getServerAddress());
// 校验 appName
WorkerAppInfo appInfo = serverDiscoveryService.assertApp();
workerRuntime.setAppInfo(appInfo);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String localBindIp = NetUtils.getLocalHost();
int localBindPort = config.getPort();
String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);
String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));
log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);

View File

@ -0,0 +1,76 @@
package tech.powerjob.worker.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.net.PingPongServer;
import tech.powerjob.common.utils.net.PingPongSocketServer;
import java.util.List;
/**
* PowerJob Worker 专用的网络工具类
*
* @author tjq
* @since 2024/2/8
*/
@Slf4j
public class WorkerNetUtils {
private static final String SERVER_CONNECTIVITY_CHECK_URL_PATTERN = "http://%s/server/checkConnectivity?targetIp=%s&targetPort=%d";
/**
* 多网卡情况下解析可与 server 通讯的本地 IP 地址
* @param port 目标端口
* @param serverAddress server 服务地址
* @return 本机IP
*/
public static String parseLocalBindIp(int port, List<String> serverAddress) {
PingPongServer pingPongServer = null;
try {
pingPongServer = new PingPongSocketServer();
pingPongServer.initialize(port);
log.info("[WorkerNetUtils] initialize PingPongSocketServer successfully~");
} catch (Exception e) {
log.warn("[WorkerNetUtils] PingPongSocketServer failed to start, which may result in an incorrectly bound IP, please pay attention to the initialize log.", e);
}
String localHostWithNetworkInterfaceChecker = NetUtils.getLocalHostWithNetworkInterfaceChecker(((networkInterface, inetAddress) -> {
if (inetAddress == null) {
return false;
}
String workerIp = inetAddress.getHostAddress();
for (String address : serverAddress) {
String url = String.format(SERVER_CONNECTIVITY_CHECK_URL_PATTERN, address, workerIp, port);
try {
String resp = HttpUtils.get(url);
log.info("[WorkerNetUtils] check connectivity by url[{}], response: {}", url, resp);
if (StringUtils.isNotEmpty(resp)) {
ResultDTO<?> resultDTO = JsonUtils.parseObject(resp, ResultDTO.class);
return Boolean.TRUE.toString().equalsIgnoreCase(String.valueOf(resultDTO.getData()));
}
} catch (Exception ignore) {
}
}
return false;
}));
if (pingPongServer != null) {
try {
pingPongServer.close();
log.info("[WorkerNetUtils] close PingPongSocketServer successfully~");
} catch (Exception e) {
log.warn("[WorkerNetUtils] close PingPongSocketServer failed!", e);
}
}
return localHostWithNetworkInterfaceChecker;
}
}