diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/HttpUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/HttpUtils.java index fa65ff83..ceb2af52 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/HttpUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/HttpUtils.java @@ -21,6 +21,7 @@ public class HttpUtils { client = new OkHttpClient.Builder() .connectTimeout(1, TimeUnit.SECONDS) .readTimeout(5, TimeUnit.SECONDS) + .writeTimeout(10, TimeUnit.SECONDS) .build(); } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java index b6cc0e14..5b7ebbff 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java @@ -217,9 +217,10 @@ public class NetUtils { if (networkInterfaceChecker == null) { return false; } - log.info("[Net] try to choose NetworkInterface by NetworkInterfaceChecker, current NetworkInterface: {}", networkInterface); try { - return networkInterfaceChecker.ok(networkInterface, getFirstReachableInetAddress(networkInterface)); + boolean ok = networkInterfaceChecker.ok(networkInterface, getFirstReachableInetAddress(networkInterface)); + log.info("[Net] try to choose NetworkInterface by NetworkInterfaceChecker, current NetworkInterface[{}], ok: {}", networkInterface, ok); + return ok; } catch (Exception e) { log.warn("[Net] isPassedCheckerNetworkInterface failed, current networkInterface: {}", networkInterface, e); } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java index d2a837c8..1ec93d74 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java @@ -35,7 +35,13 @@ public class PingPongSocketServer implements PingPongServer { } // 接收连接,如果没有连接,accept() 方法会阻塞 try (Socket socket = serverSocket.accept();OutputStream outputStream = socket.getOutputStream();) { + + socket.setSoTimeout(2000); + socket.setKeepAlive(false); + outputStream.write(PingPongUtils.PONG.getBytes(StandardCharsets.UTF_8)); + // BufferedReader.readLine() 会等待直到遇到换行符(\n)或回车符(\r\n),才会返回一行内容。如果服务器发送的数据没有这些换行符,readLine() 会一直阻塞,直到超时 + outputStream.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8)); outputStream.flush(); } catch (Exception e) { if (!terminated) { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java index ee547254..821f7352 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java @@ -30,6 +30,9 @@ public class PingPongUtils { try (Socket s = new Socket(targetIp, targetPort);InputStream is = s.getInputStream();OutputStream os = s.getOutputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + s.setSoTimeout(2000); + s.setKeepAlive(false); + // 发送 PING 请求 os.write(PING.getBytes(StandardCharsets.UTF_8)); os.flush(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java index 8d9d0a31..2a017c3e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java @@ -35,42 +35,47 @@ public class WorkerNetUtils { pingPongServer = new PingPongSocketServer(); pingPongServer.initialize(port); log.info("[WorkerNetUtils] initialize PingPongSocketServer successfully~"); + + return NetUtils.getLocalHostWithNetworkInterfaceChecker(((networkInterface, inetAddress) -> { + + if (inetAddress == null) { + log.info("[WorkerNetUtils] [networkInterface:{}] skip due to inetAddress is null!", networkInterface); + return false; + } + + String workerIp = inetAddress.getHostAddress(); + for (String address : serverAddress) { + String url = String.format(SERVER_CONNECTIVITY_CHECK_URL_PATTERN, address, workerIp, port); + try { + String resp = HttpUtils.get(url); + log.info("[WorkerNetUtils] [networkInterface:{},inetAddress:{}] check connectivity by url[{}], response: {}", networkInterface, inetAddress, url, resp); + if (StringUtils.isNotEmpty(resp)) { + ResultDTO resultDTO = JsonUtils.parseObject(resp, ResultDTO.class); + boolean ret = Boolean.TRUE.toString().equalsIgnoreCase(String.valueOf(resultDTO.getData())); + if (ret) { + return true; + } + } + } catch (Exception ignore) { + } + } + return false; + })); + } catch (Exception e) { log.warn("[WorkerNetUtils] PingPongSocketServer failed to start, which may result in an incorrectly bound IP, please pay attention to the initialize log.", e); - } - - String localHostWithNetworkInterfaceChecker = NetUtils.getLocalHostWithNetworkInterfaceChecker(((networkInterface, inetAddress) -> { - - if (inetAddress == null) { - return false; - } - - String workerIp = inetAddress.getHostAddress(); - for (String address : serverAddress) { - String url = String.format(SERVER_CONNECTIVITY_CHECK_URL_PATTERN, address, workerIp, port); + } finally { + if (pingPongServer != null) { try { - String resp = HttpUtils.get(url); - log.info("[WorkerNetUtils] check connectivity by url[{}], response: {}", url, resp); - if (StringUtils.isNotEmpty(resp)) { - ResultDTO resultDTO = JsonUtils.parseObject(resp, ResultDTO.class); - return Boolean.TRUE.toString().equalsIgnoreCase(String.valueOf(resultDTO.getData())); - } - } catch (Exception ignore) { + pingPongServer.close(); + log.info("[WorkerNetUtils] close PingPongSocketServer successfully~"); + } catch (Exception e) { + log.warn("[WorkerNetUtils] close PingPongSocketServer failed!", e); } } - return false; - })); - - if (pingPongServer != null) { - try { - pingPongServer.close(); - log.info("[WorkerNetUtils] close PingPongSocketServer successfully~"); - } catch (Exception e) { - log.warn("[WorkerNetUtils] close PingPongSocketServer failed!", e); - } } - return localHostWithNetworkInterfaceChecker; + return NetUtils.getLocalHostWithNetworkInterfaceChecker(null); } }