From e26f2df2d00d0cab725f237108a3f930957a37cf Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 22 Jan 2023 00:33:11 +0800 Subject: [PATCH] fix: server elect bug --- .../tech/powerjob/common/OmsConstant.java | 4 ++ .../request/ServerDiscoveryRequest.java | 59 +++++++++++++++++++ .../powerjob/common/serialize/JsonUtils.java | 23 ++++---- .../server/remote/server/FriendActor.java | 10 +++- .../election/ServerElectionService.java | 43 +++++++++----- .../redirector/DesignateServerAspect.java | 2 +- .../remote/transporter/ProtocolInfo.java | 8 ++- .../remote/transporter/TransportService.java | 7 +++ .../impl/PowerTransportService.java | 17 ++++-- .../web/controller/ServerController.java | 5 +- .../background/ServerDiscoveryService.java | 18 +++++- 11 files changed, 154 insertions(+), 42 deletions(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/request/ServerDiscoveryRequest.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/OmsConstant.java b/powerjob-common/src/main/java/tech/powerjob/common/OmsConstant.java index bed70eca..f589e869 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/OmsConstant.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/OmsConstant.java @@ -22,6 +22,10 @@ public class OmsConstant { public static final String NONE = "N/A"; public static final String COMMA = ","; + + public static final String AND = "&"; + + public static final String EQUAL = "="; public static final String LINE_SEPARATOR = "\r\n"; public static final String HTTP_HEADER_CONTENT_TYPE = "Content-Type"; diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerDiscoveryRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerDiscoveryRequest.java new file mode 100644 index 00000000..34a7d6bd --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerDiscoveryRequest.java @@ -0,0 +1,59 @@ +package tech.powerjob.common.request; + +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.enums.Protocol; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * 服务发现请求 + * + * @author tjq + * @since 2023/1/21 + */ +@Setter +@Accessors(chain = true) +public class ServerDiscoveryRequest implements Serializable { + + private Long appId; + + private String protocol; + + private String currentServer; + + private String clientVersion; + + public Map toMap() { + Map ret = new HashMap<>(); + ret.put("appId", appId); + ret.put("protocol", protocol); + if (StringUtils.isNotEmpty(currentServer)) { + ret.put("currentServer", currentServer); + } + if (StringUtils.isNotEmpty(clientVersion)) { + ret.put("clientVersion", clientVersion); + } + return ret; + } + + public Long getAppId() { + return appId; + } + + public String getProtocol() { + return Optional.ofNullable(protocol).orElse(Protocol.AKKA.name()); + } + + public String getCurrentServer() { + return currentServer; + } + + public String getClientVersion() { + return clientVersion; + } +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java index 6809fa4c..ee40566a 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java @@ -3,10 +3,11 @@ package tech.powerjob.common.serialize; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; import lombok.extern.slf4j.Slf4j; -import tech.powerjob.common.exception.PowerJobException; import org.apache.commons.lang3.exception.ExceptionUtils; +import tech.powerjob.common.exception.PowerJobException; import java.io.IOException; @@ -19,13 +20,11 @@ import java.io.IOException; @Slf4j public class JsonUtils { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - static { - OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); - // - OBJECT_MAPPER.configure(JsonParser.Feature.IGNORE_UNDEFINED, true); - } + private static final JsonMapper OBJECT_MAPPER = JsonMapper.builder() + .configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true) + .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) + .configure(JsonParser.Feature.IGNORE_UNDEFINED, true) + .build(); private JsonUtils(){ @@ -34,7 +33,8 @@ public class JsonUtils { public static String toJSONString(Object obj) { try { return OBJECT_MAPPER.writeValueAsString(obj); - }catch (Exception ignore) { + }catch (Exception e) { + log.error("[PowerJob] toJSONString failed", e); } return null; } @@ -50,7 +50,8 @@ public class JsonUtils { public static byte[] toBytes(Object obj) { try { return OBJECT_MAPPER.writeValueAsBytes(obj); - }catch (Exception ignore) { + }catch (Exception e) { + log.error("[PowerJob] serialize failed", e); } return null; } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java index 3174780d..0b5a848f 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java @@ -11,6 +11,7 @@ import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.server.remote.server.election.Ping; import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; +import tech.powerjob.server.remote.transporter.TransportService; import static tech.powerjob.common.RemoteConstant.*; @@ -25,14 +26,19 @@ import static tech.powerjob.common.RemoteConstant.*; @Actor(path = S4S_PATH) public class FriendActor { - private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA=="; + private final TransportService transportService; + + public FriendActor(TransportService transportService) { + this.transportService = transportService; + } /** * 处理存活检测的请求 */ @Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING) public AskResponse onReceivePing(Ping ping) { - return AskResponse.succeed(SK); + final AskResponse response = AskResponse.succeed(transportService.allProtocols()); + return response; } @Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING) diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java index b3871159..37eb4639 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java @@ -1,5 +1,6 @@ package tech.powerjob.server.remote.server.election; +import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -7,11 +8,14 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.request.ServerDiscoveryRequest; import tech.powerjob.common.response.AskResponse; +import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; +import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.transporter.TransportService; @@ -50,18 +54,21 @@ public class ServerElectionService { this.accurateSelectServerPercentage = accurateSelectServerPercentage; } - public String elect(Long appId, String protocol, String currentServer) { + public String elect(ServerDiscoveryRequest request) { if (!accurate()) { // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 - if (transportService.defaultProtocol().getAddress().equals(currentServer)) { - return currentServer; + Optional localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol())); + if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(request.getCurrentServer())) { + return request.getCurrentServer(); } } - return getServer0(appId, protocol); + return getServer0(request); } - private String getServer0(Long appId, String protocol) { + private String getServer0(ServerDiscoveryRequest discoveryRequest) { + final Long appId = discoveryRequest.getAppId(); + final String protocol = discoveryRequest.getProtocol(); Set downServerCache = Sets.newHashSet(); for (int i = 0; i < RETRY_TIMES; i++) { @@ -97,15 +104,17 @@ public class ServerElectionService { return address; } - // 篡位,本机作为Server - // 注意,写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址,仅在返回的时候特殊处理 (4.3.0 更改为 HTTP) - final String selfDefaultAddress = transportService.defaultProtocol().getAddress(); - appInfo.setCurrentServer(selfDefaultAddress); - appInfo.setGmtModified(new Date()); + // 篡位,如果本机存在协议,则作为Server调度该 worker + final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol); + if (targetProtocolInfo != null) { + // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址 + appInfo.setCurrentServer(transportService.defaultProtocol().getAddress()); + appInfo.setGmtModified(new Date()); - appInfoRepository.saveAndFlush(appInfo); - log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); - return selfDefaultAddress; + appInfoRepository.saveAndFlush(appInfo); + log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); + return targetProtocolInfo.getAddress(); + } }catch (Exception e) { log.error("[ServerElection] write new server to db failed for app {}.", appName, e); }finally { @@ -142,10 +151,14 @@ public class ServerElectionService { if (response.isSuccess()) { log.info("[ServerElection] server[{}] is active, it will be the master.", serverAddress); downServerCache.remove(serverAddress); - return serverAddress; + // 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址 + final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol); + if (protocolInfo != null) { + return protocolInfo.toJavaObject(ProtocolInfo.class).getAddress(); + } } }catch (Exception e) { - log.warn("[ServerElection] server[{}] was down.", serverAddress); + log.warn("[ServerElection] server[{}] was down.", serverAddress, e); } downServerCache.add(serverAddress); return null; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java index 1f178dde..ad906487 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java @@ -86,7 +86,7 @@ public class DesignateServerAspect { } // 目标IP与本地符合则本地执行 - if (Objects.equals(targetServer, transportService.defaultProtocol())) { + if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) { return point.proceed(); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java index e620d2df..0802e811 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java @@ -1,6 +1,7 @@ package tech.powerjob.server.remote.transporter; import lombok.Getter; +import lombok.Setter; import lombok.ToString; import tech.powerjob.remote.framework.transporter.Transporter; @@ -11,14 +12,15 @@ import tech.powerjob.remote.framework.transporter.Transporter; * @since 2023/1/21 */ @Getter +@Setter @ToString public class ProtocolInfo { - private final String protocol; + private String protocol; - private final String address; + private String address; - private final transient Transporter transporter; + private transient Transporter transporter; public ProtocolInfo(String protocol, String address, Transporter transporter) { this.protocol = protocol; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/TransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/TransportService.java index 3124aa27..97e390e3 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/TransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/TransportService.java @@ -4,6 +4,7 @@ import tech.powerjob.common.PowerSerializable; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.URL; +import java.util.Map; import java.util.concurrent.CompletionStage; /** @@ -22,6 +23,12 @@ public interface TransportService { */ ProtocolInfo defaultProtocol(); + /** + * 当前支持的全部协议 + * @return allProtocols + */ + Map allProtocols(); + void tell(String protocol, URL url, PowerSerializable request); CompletionStage ask(String protocol, URL url, PowerSerializable request, Class clz) throws RemotingException; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java index 26dad376..4dd39a75 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java @@ -50,7 +50,7 @@ public class PowerTransportService implements TransportService, InitializingBean private final Environment environment; private ProtocolInfo defaultProtocol; - private final Map protocol2Transporter = Maps.newHashMap(); + private final Map protocolName2Info = Maps.newHashMap(); private final List engines = Lists.newArrayList(); @@ -65,10 +65,15 @@ public class PowerTransportService implements TransportService, InitializingBean return defaultProtocol; } + @Override + public Map allProtocols() { + return protocolName2Info; + } + private ProtocolInfo fetchProtocolInfo(String protocol) { // 兼容老版 worker 未上报 protocol 的情况 protocol = compatibleProtocol(protocol); - final ProtocolInfo protocolInfo = protocol2Transporter.get(protocol); + final ProtocolInfo protocolInfo = protocolName2Info.get(protocol); if (protocolInfo == null) { throw new IllegalArgumentException("can't find Transporter by protocol :" + protocol); } @@ -105,7 +110,7 @@ public class PowerTransportService implements TransportService, InitializingBean log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); this.engines.add(re); - this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter())); + this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter())); } @Override @@ -131,7 +136,7 @@ public class PowerTransportService implements TransportService, InitializingBean choseDefault(); log.info("[PowerTransportService] initialize successfully!"); - log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocol2Transporter); + log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocolName2Info); } /** @@ -165,7 +170,7 @@ public class PowerTransportService implements TransportService, InitializingBean * HTTP 优先,否则默认取第一个协议 */ private void choseDefault() { - ProtocolInfo httpP = protocol2Transporter.get(Protocol.HTTP.name()); + ProtocolInfo httpP = protocolName2Info.get(Protocol.HTTP.name()); if (httpP != null) { log.info("[PowerTransportService] exist HTTP protocol, chose this as the default protocol!"); this.defaultProtocol = httpP; @@ -173,7 +178,7 @@ public class PowerTransportService implements TransportService, InitializingBean } String firstProtocol = activeProtocols.split(OmsConstant.COMMA)[0]; - this.defaultProtocol = this.protocol2Transporter.get(firstProtocol); + this.defaultProtocol = this.protocolName2Info.get(firstProtocol); log.info("[PowerTransportService] chose [{}] as the default protocol!", firstProtocol); if (this.defaultProtocol == null) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java index 1565cdb0..5617a6c6 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java @@ -7,6 +7,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import tech.powerjob.common.request.ServerDiscoveryRequest; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; @@ -47,8 +48,8 @@ public class ServerController { } @GetMapping("/acquire") - public ResultDTO acquireServer(Long appId, String protocol, String currentServer) { - return ResultDTO.success(serverElectionService.elect(appId, protocol, currentServer)); + public ResultDTO acquireServer(ServerDiscoveryRequest request) { + return ResultDTO.success(serverElectionService.elect(request)); } @GetMapping("/hello") diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java index 89f07874..e7e1752c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java @@ -1,9 +1,12 @@ package tech.powerjob.worker.background; +import com.google.common.base.Joiner; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.OmsConstant; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.request.ServerDiscoveryRequest; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; @@ -37,7 +40,7 @@ public class ServerDiscoveryService { /** * 服务发现地址 */ - private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s&protocol=AKKA"; + private static final String DISCOVERY_URL = "http://%s/server/acquire?%s"; /** * 失败次数 */ @@ -131,7 +134,7 @@ public class ServerDiscoveryService { @SuppressWarnings("rawtypes") private String acquire(String httpServerAddress) { String result = null; - String url = String.format(DISCOVERY_URL, httpServerAddress, appId, currentServerAddress); + String url = buildServerDiscoveryUrl(httpServerAddress); try { result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); }catch (Exception ignore) { @@ -147,4 +150,15 @@ public class ServerDiscoveryService { } return null; } + + private String buildServerDiscoveryUrl(String address) { + + ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest() + .setAppId(appId) + .setCurrentServer(currentServerAddress) + .setProtocol(config.getProtocol().name().toUpperCase()); + + String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap()); + return String.format(DISCOVERY_URL, address, query); + } }