mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: support non-LAN communication(server side)
This commit is contained in:
parent
7318fed73a
commit
d3140d0501
@ -60,11 +60,13 @@ public class ServerElectionService {
|
||||
final String currentServer = request.getCurrentServer();
|
||||
// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
|
||||
Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
|
||||
if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
|
||||
log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
|
||||
if (localProtocolInfoOpt.isPresent()) {
|
||||
if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
|
||||
log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());
|
||||
return currentServer;
|
||||
}
|
||||
}
|
||||
}
|
||||
return getServer0(request);
|
||||
}
|
||||
|
||||
@ -110,13 +112,13 @@ public class ServerElectionService {
|
||||
// 篡位,如果本机存在协议,则作为Server调度该 worker
|
||||
final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
|
||||
if (targetProtocolInfo != null) {
|
||||
// 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
|
||||
// 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址
|
||||
appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
|
||||
appInfo.setGmtModified(new Date());
|
||||
|
||||
appInfoRepository.saveAndFlush(appInfo);
|
||||
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
|
||||
return targetProtocolInfo.getAddress();
|
||||
return targetProtocolInfo.getExternalAddress();
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
|
||||
@ -129,10 +131,10 @@ public class ServerElectionService {
|
||||
|
||||
/**
|
||||
* 判断指定server是否存活
|
||||
* @param serverAddress 需要检测的server地址
|
||||
* @param serverAddress 需要检测的server地址(绑定的内网地址)
|
||||
* @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...)
|
||||
* @param protocol 协议,用于返回指定的地址
|
||||
* @return null or address
|
||||
* @return null or address(外部地址)
|
||||
*/
|
||||
private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {
|
||||
|
||||
@ -156,9 +158,9 @@ public class ServerElectionService {
|
||||
final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);
|
||||
if (protocolInfo != null) {
|
||||
downServerCache.remove(serverAddress);
|
||||
final String protocolAddress = protocolInfo.toJavaObject(ProtocolInfo.class).getAddress();
|
||||
log.info("[ServerElection] server[{}] is active, it will be the master, final protocol address={}", serverAddress, protocolAddress);
|
||||
return protocolAddress;
|
||||
ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);
|
||||
log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);
|
||||
return remoteProtocol.getExternalAddress();
|
||||
} else {
|
||||
log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);
|
||||
}
|
||||
|
@ -30,6 +30,12 @@ public class ProtocolInfo {
|
||||
|
||||
private transient Transporter transporter;
|
||||
|
||||
/**
|
||||
* 序列化需要,必须存在无参构造方法!严禁删除
|
||||
*/
|
||||
public ProtocolInfo() {
|
||||
}
|
||||
|
||||
public ProtocolInfo(String protocol, String host, int port, Transporter transporter) {
|
||||
this.protocol = protocol;
|
||||
this.transporter = transporter;
|
||||
|
Loading…
x
Reference in New Issue
Block a user