From eda39a6372054d7af2ffcae43efe0115b7b3cef2 Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 8 Feb 2021 21:07:00 +0800 Subject: [PATCH] feat: support other protocol's server elect #209 --- .../service/ha/ServerSelectService.java | 23 +++++++++++-------- .../server/transport/TransportService.java | 23 +++++++++++-------- .../web/controller/ServerController.java | 4 ++-- .../background/ServerDiscoveryService.java | 2 +- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java index 996c45bc..cadc85b2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java @@ -3,7 +3,9 @@ package com.github.kfcfans.powerjob.server.service.ha; import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.Protocol; import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.server.transport.TransportService; import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.handler.inner.requests.Ping; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; @@ -37,6 +39,8 @@ public class ServerSelectService { @Resource private LockService lockService; @Resource + private TransportService transportService; + @Resource private AppInfoRepository appInfoRepository; @Value("${oms.accurate.select.server.percentage}") @@ -47,17 +51,17 @@ public class ServerSelectService { private static final String SERVER_ELECT_LOCK = "server_elect_%d"; - public String getServer(Long appId, String currentServer) { + public String getServer(Long appId, String currentServer, String protocol) { if (!accurate()) { // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 - if (AkkaStarter.getActorSystemAddress().equals(currentServer)) { + if (getThisServerAddress(protocol).equals(currentServer)) { return currentServer; } } - return getServer0(appId); + return getServer0(appId, protocol); } - private String getServer0(Long appId) { + private String getServer0(Long appId, String protocol) { Set downServerCache = Sets.newHashSet(); @@ -93,7 +97,7 @@ public class ServerSelectService { } // 篡位,本机作为Server - appInfo.setCurrentServer(AkkaStarter.getActorSystemAddress()); + appInfo.setCurrentServer(getThisServerAddress(protocol)); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); @@ -123,10 +127,6 @@ public class ServerSelectService { return false; } - if (AkkaStarter.getActorSystemAddress().equals(serverAddress)) { - return true; - } - Ping ping = new Ping(); ping.setCurrentTime(System.currentTimeMillis()); @@ -146,4 +146,9 @@ public class ServerSelectService { private boolean accurate() { return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage; } + + private String getThisServerAddress(String protocol) { + Protocol pt = Protocol.of(protocol); + return transportService.getTransporter(pt).getAddress(); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java index b3f9c926..f9192a4f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/TransportService.java @@ -8,7 +8,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -33,20 +32,26 @@ public class TransportService { } public void tell(Protocol protocol, String address, OmsSerializable object) { - Transporter transporter = protocol2Transporter.get(protocol); - if (transporter == null) { - log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol); - return; - } - transporter.tell(address, object); + getTransporter(protocol).tell(address, object); } public AskResponse ask(Protocol protocol, String address, OmsSerializable object) throws Exception { + + return getTransporter(protocol).ask(address, object); + } + + public Transporter getTransporter(Protocol protocol) { Transporter transporter = protocol2Transporter.get(protocol); if (transporter == null) { log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol); - throw new IOException("can't find transporter by protocol: " + protocol); + throw new UnknownProtocolException("can't find transporter by protocol: " + protocol); + } + return transporter; + } + + public static class UnknownProtocolException extends RuntimeException { + public UnknownProtocolException(String message) { + super(message); } - return transporter.ask(address, object); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java index 742c68d6..3940493c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java @@ -43,8 +43,8 @@ public class ServerController { } @GetMapping("/acquire") - public ResultDTO acquireServer(Long appId, String currentServer) { - return ResultDTO.success(serverSelectService.getServer(appId, currentServer)); + public ResultDTO acquireServer(Long appId, String currentServer, String protocol) { + return ResultDTO.success(serverSelectService.getServer(appId, currentServer, protocol)); } @GetMapping("/hello") diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java index b31b7653..8dd22416 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java @@ -27,7 +27,7 @@ public class ServerDiscoveryService { // 配置的可发起HTTP请求的Server(IP:Port) private static final Map IP2ADDRESS = Maps.newHashMap(); // 服务发现地址 - private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s"; + private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s&protocol=AKKA"; // 失败次数 private static int FAILED_COUNT = 0; // 最大失败次数