fix: returned an incorrect address when using HTTP protocol

This commit is contained in:
tjq 2021-02-16 14:11:08 +08:00
parent 83f6cf50a7
commit 770f30dd05
3 changed files with 28 additions and 13 deletions

View File

@ -2,13 +2,14 @@ package com.github.kfcfans.powerjob.server.remote.server;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.remote.server.request.Ping; import com.github.kfcfans.powerjob.server.remote.server.request.Ping;
import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq; import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.ReflectionUtils; import org.springframework.util.ReflectionUtils;
@ -38,7 +39,7 @@ public class FriendRequestHandler extends AbstractActor {
* 处理存活检测的请求 * 处理存活检测的请求
*/ */
private void onReceivePing(Ping ping) { private void onReceivePing(Ping ping) {
getSender().tell(AskResponse.succeed(System.currentTimeMillis() - ping.getCurrentTime()), getSelf()); getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf());
} }
/** /**

View File

@ -2,9 +2,11 @@ package com.github.kfcfans.powerjob.server.remote.server.election;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.Protocol; import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.extension.ServerElectionService; import com.github.kfcfans.powerjob.server.extension.ServerElectionService;
import com.github.kfcfans.powerjob.server.remote.server.request.Ping; import com.github.kfcfans.powerjob.server.remote.server.request.Ping;
@ -75,8 +77,9 @@ public class DefaultServerElectionService implements ServerElectionService {
} }
String appName = appInfoOpt.get().getAppName(); String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer(); String originServer = appInfoOpt.get().getCurrentServer();
if (isActive(originServer, downServerCache)) { String activeAddress = activeAddress(originServer, downServerCache, protocol);
return originServer; if (StringUtils.isNotEmpty(activeAddress)) {
return activeAddress;
} }
// 无可用Server重新进行Server选举需要加锁 // 无可用Server重新进行Server选举需要加锁
@ -93,8 +96,9 @@ public class DefaultServerElectionService implements ServerElectionService {
// 可能上一台机器已经完成了Server选举需要再次判断 // 可能上一台机器已经完成了Server选举需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
if (isActive(appInfo.getCurrentServer(), downServerCache)) { String address = activeAddress(originServer, downServerCache, protocol);
return appInfo.getCurrentServer(); if (StringUtils.isNotEmpty(address)) {
return address;
} }
// 篡位本机作为Server // 篡位本机作为Server
@ -118,15 +122,16 @@ public class DefaultServerElectionService implements ServerElectionService {
* 判断指定server是否存活 * 判断指定server是否存活
* @param serverAddress 需要检测的server地址 * @param serverAddress 需要检测的server地址
* @param downServerCache 缓存防止多次发送PING这个QPS其实还蛮爆表的... * @param downServerCache 缓存防止多次发送PING这个QPS其实还蛮爆表的...
* @return true 存活 / false down机 * @param protocol 协议用于返回指定的地址
* @return null or address
*/ */
private boolean isActive(String serverAddress, Set<String> downServerCache) { private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {
if (downServerCache.contains(serverAddress)) { if (downServerCache.contains(serverAddress)) {
return false; return null;
} }
if (StringUtils.isEmpty(serverAddress)) { if (StringUtils.isEmpty(serverAddress)) {
return false; return null;
} }
Ping ping = new Ping(); Ping ping = new Ping();
@ -137,12 +142,14 @@ public class DefaultServerElectionService implements ServerElectionService {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
downServerCache.remove(serverAddress); downServerCache.remove(serverAddress);
return response.isSuccess(); if (response.isSuccess()) {
return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol);
}
}catch (Exception e) { }catch (Exception e) {
log.warn("[ServerElection] server({}) was down.", serverAddress); log.warn("[ServerElection] server({}) was down.", serverAddress);
} }
downServerCache.add(serverAddress); downServerCache.add(serverAddress);
return false; return null;
} }
private boolean accurate() { private boolean accurate() {
@ -151,6 +158,6 @@ public class DefaultServerElectionService implements ServerElectionService {
private String getProtocolServerAddress(String protocol) { private String getProtocolServerAddress(String protocol) {
Protocol pt = Protocol.of(protocol); Protocol pt = Protocol.of(protocol);
return transportService.getTransporter(pt).getAddress(); return TransportService.getAllAddress().get(pt);
} }
} }

View File

@ -22,6 +22,8 @@ import java.util.Map;
@Service @Service
public class TransportService { public class TransportService {
private static final Map<Protocol, String> protocol2Address = Maps.newHashMap();
@Getter @Getter
private final Map<Protocol, Transporter> protocol2Transporter = Maps.newConcurrentMap(); private final Map<Protocol, Transporter> protocol2Transporter = Maps.newConcurrentMap();
@ -30,6 +32,7 @@ public class TransportService {
transporters.forEach(t -> { transporters.forEach(t -> {
log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress()); log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress());
protocol2Transporter.put(t.getProtocol(), t); protocol2Transporter.put(t.getProtocol(), t);
protocol2Address.put(t.getProtocol(), t.getAddress());
}); });
} }
@ -56,4 +59,8 @@ public class TransportService {
super(message); super(message);
} }
} }
public static Map<Protocol, String> getAllAddress() {
return protocol2Address;
}
} }