From d3140d05017ce3c6fc7ddc597c113ba72e9b305a Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 15 Jul 2023 22:22:38 +0800 Subject: [PATCH] feat: support non-LAN communication(server side) --- .../election/ServerElectionService.java | 22 ++++++++++--------- .../remote/transporter/ProtocolInfo.java | 6 +++++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java index b2a86791..0c384d6a 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java @@ -60,9 +60,11 @@ public class ServerElectionService { final String currentServer = request.getCurrentServer(); // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 Optional localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol())); - if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) { - log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer); - return currentServer; + if (localProtocolInfoOpt.isPresent()) { + if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) { + log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId()); + return currentServer; + } } } return getServer0(request); @@ -110,13 +112,13 @@ public class ServerElectionService { // 篡位,如果本机存在协议,则作为Server调度该 worker final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol); if (targetProtocolInfo != null) { - // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址 + // 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址 appInfo.setCurrentServer(transportService.defaultProtocol().getAddress()); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); - return targetProtocolInfo.getAddress(); + return targetProtocolInfo.getExternalAddress(); } }catch (Exception e) { log.error("[ServerElection] write new server to db failed for app {}.", appName, e); @@ -129,10 +131,10 @@ public class ServerElectionService { /** * 判断指定server是否存活 - * @param serverAddress 需要检测的server地址 + * @param serverAddress 需要检测的server地址(绑定的内网地址) * @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...) * @param protocol 协议,用于返回指定的地址 - * @return null or address + * @return null or address(外部地址) */ private String activeAddress(String serverAddress, Set downServerCache, String protocol) { @@ -156,9 +158,9 @@ public class ServerElectionService { final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol); if (protocolInfo != null) { downServerCache.remove(serverAddress); - final String protocolAddress = protocolInfo.toJavaObject(ProtocolInfo.class).getAddress(); - log.info("[ServerElection] server[{}] is active, it will be the master, final protocol address={}", serverAddress, protocolAddress); - return protocolAddress; + ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class); + log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol); + return remoteProtocol.getExternalAddress(); } else { log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java index c27d939c..1832a7e0 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java @@ -30,6 +30,12 @@ public class ProtocolInfo { private transient Transporter transporter; + /** + * 序列化需要,必须存在无参构造方法!严禁删除 + */ + public ProtocolInfo() { + } + public ProtocolInfo(String protocol, String host, int port, Transporter transporter) { this.protocol = protocol; this.transporter = transporter;