From 770f30dd059f2e75895584e2e465ebe0be76eea9 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 16 Feb 2021 14:11:08 +0800 Subject: [PATCH] fix: returned an incorrect address when using HTTP protocol --- .../remote/server/FriendRequestHandler.java | 5 ++-- .../DefaultServerElectionService.java | 29 ++++++++++++------- .../remote/transport/TransportService.java | 7 +++++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java index 2c246c1b..b6d8cef1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java @@ -2,13 +2,14 @@ package com.github.kfcfans.powerjob.server.remote.server; import akka.actor.AbstractActor; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.remote.server.request.Ping; import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq; +import com.github.kfcfans.powerjob.server.remote.transport.TransportService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.util.ReflectionUtils; @@ -38,7 +39,7 @@ public class FriendRequestHandler extends AbstractActor { * 处理存活检测的请求 */ private void onReceivePing(Ping ping) { - getSender().tell(AskResponse.succeed(System.currentTimeMillis() - ping.getCurrentTime()), getSelf()); + getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf()); } /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java index 716ba850..236b24a3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java @@ -2,9 +2,11 @@ package com.github.kfcfans.powerjob.server.remote.server.election; import akka.actor.ActorSelection; import akka.pattern.Patterns; +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.Protocol; import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.extension.ServerElectionService; import com.github.kfcfans.powerjob.server.remote.server.request.Ping; @@ -75,8 +77,9 @@ public class DefaultServerElectionService implements ServerElectionService { } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); - if (isActive(originServer, downServerCache)) { - return originServer; + String activeAddress = activeAddress(originServer, downServerCache, protocol); + if (StringUtils.isNotEmpty(activeAddress)) { + return activeAddress; } // 无可用Server,重新进行Server选举,需要加锁 @@ -93,8 +96,9 @@ public class DefaultServerElectionService implements ServerElectionService { // 可能上一台机器已经完成了Server选举,需要再次判断 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); - if (isActive(appInfo.getCurrentServer(), downServerCache)) { - return appInfo.getCurrentServer(); + String address = activeAddress(originServer, downServerCache, protocol); + if (StringUtils.isNotEmpty(address)) { + return address; } // 篡位,本机作为Server @@ -118,15 +122,16 @@ public class DefaultServerElectionService implements ServerElectionService { * 判断指定server是否存活 * @param serverAddress 需要检测的server地址 * @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...) - * @return true 存活 / false down机 + * @param protocol 协议,用于返回指定的地址 + * @return null or address */ - private boolean isActive(String serverAddress, Set downServerCache) { + private String activeAddress(String serverAddress, Set downServerCache, String protocol) { if (downServerCache.contains(serverAddress)) { - return false; + return null; } if (StringUtils.isEmpty(serverAddress)) { - return false; + return null; } Ping ping = new Ping(); @@ -137,12 +142,14 @@ public class DefaultServerElectionService implements ServerElectionService { CompletionStage askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); downServerCache.remove(serverAddress); - return response.isSuccess(); + if (response.isSuccess()) { + return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol); + } }catch (Exception e) { log.warn("[ServerElection] server({}) was down.", serverAddress); } downServerCache.add(serverAddress); - return false; + return null; } private boolean accurate() { @@ -151,6 +158,6 @@ public class DefaultServerElectionService implements ServerElectionService { private String getProtocolServerAddress(String protocol) { Protocol pt = Protocol.of(protocol); - return transportService.getTransporter(pt).getAddress(); + return TransportService.getAllAddress().get(pt); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java index 53c764e5..bfd6b947 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java @@ -22,6 +22,8 @@ import java.util.Map; @Service public class TransportService { + private static final Map protocol2Address = Maps.newHashMap(); + @Getter private final Map protocol2Transporter = Maps.newConcurrentMap(); @@ -30,6 +32,7 @@ public class TransportService { transporters.forEach(t -> { log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress()); protocol2Transporter.put(t.getProtocol(), t); + protocol2Address.put(t.getProtocol(), t.getAddress()); }); } @@ -56,4 +59,8 @@ public class TransportService { super(message); } } + + public static Map getAllAddress() { + return protocol2Address; + } }