fix: server elect bug

This commit is contained in:
tjq 2023-01-22 00:33:11 +08:00
parent 571b7cf3f2
commit e26f2df2d0
11 changed files with 154 additions and 42 deletions

View File

@ -22,6 +22,10 @@ public class OmsConstant {
public static final String NONE = "N/A";
public static final String COMMA = ",";
public static final String AND = "&";
public static final String EQUAL = "=";
public static final String LINE_SEPARATOR = "\r\n";
public static final String HTTP_HEADER_CONTENT_TYPE = "Content-Type";

View File

@ -0,0 +1,59 @@
package tech.powerjob.common.request;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enums.Protocol;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* 服务发现请求
*
* @author tjq
* @since 2023/1/21
*/
@Setter
@Accessors(chain = true)
public class ServerDiscoveryRequest implements Serializable {
private Long appId;
private String protocol;
private String currentServer;
private String clientVersion;
public Map<String, Object> toMap() {
Map<String, Object> ret = new HashMap<>();
ret.put("appId", appId);
ret.put("protocol", protocol);
if (StringUtils.isNotEmpty(currentServer)) {
ret.put("currentServer", currentServer);
}
if (StringUtils.isNotEmpty(clientVersion)) {
ret.put("clientVersion", clientVersion);
}
return ret;
}
public Long getAppId() {
return appId;
}
public String getProtocol() {
return Optional.ofNullable(protocol).orElse(Protocol.AKKA.name());
}
public String getCurrentServer() {
return currentServer;
}
public String getClientVersion() {
return clientVersion;
}
}

View File

@ -3,10 +3,11 @@ package tech.powerjob.common.serialize;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.exception.PowerJobException;
import java.io.IOException;
@ -19,13 +20,11 @@ import java.io.IOException;
@Slf4j
public class JsonUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static {
OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
//
OBJECT_MAPPER.configure(JsonParser.Feature.IGNORE_UNDEFINED, true);
}
private static final JsonMapper OBJECT_MAPPER = JsonMapper.builder()
.configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true)
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
.build();
private JsonUtils(){
@ -34,7 +33,8 @@ public class JsonUtils {
public static String toJSONString(Object obj) {
try {
return OBJECT_MAPPER.writeValueAsString(obj);
}catch (Exception ignore) {
}catch (Exception e) {
log.error("[PowerJob] toJSONString failed", e);
}
return null;
}
@ -50,7 +50,8 @@ public class JsonUtils {
public static byte[] toBytes(Object obj) {
try {
return OBJECT_MAPPER.writeValueAsBytes(obj);
}catch (Exception ignore) {
}catch (Exception e) {
log.error("[PowerJob] serialize failed", e);
}
return null;
}

View File

@ -11,6 +11,7 @@ import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.remote.server.election.Ping;
import tech.powerjob.server.remote.server.redirector.RemoteProcessReq;
import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor;
import tech.powerjob.server.remote.transporter.TransportService;
import static tech.powerjob.common.RemoteConstant.*;
@ -25,14 +26,19 @@ import static tech.powerjob.common.RemoteConstant.*;
@Actor(path = S4S_PATH)
public class FriendActor {
private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA==";
private final TransportService transportService;
public FriendActor(TransportService transportService) {
this.transportService = transportService;
}
/**
* 处理存活检测的请求
*/
@Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING)
public AskResponse onReceivePing(Ping ping) {
return AskResponse.succeed(SK);
final AskResponse response = AskResponse.succeed(transportService.allProtocols());
return response;
}
@Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING)

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.remote.server.election;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -7,11 +8,14 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.transporter.ProtocolInfo;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.transporter.TransportService;
@ -50,18 +54,21 @@ public class ServerElectionService {
this.accurateSelectServerPercentage = accurateSelectServerPercentage;
}
public String elect(Long appId, String protocol, String currentServer) {
public String elect(ServerDiscoveryRequest request) {
if (!accurate()) {
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
if (transportService.defaultProtocol().getAddress().equals(currentServer)) {
return currentServer;
Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(request.getCurrentServer())) {
return request.getCurrentServer();
}
}
return getServer0(appId, protocol);
return getServer0(request);
}
private String getServer0(Long appId, String protocol) {
private String getServer0(ServerDiscoveryRequest discoveryRequest) {
final Long appId = discoveryRequest.getAppId();
final String protocol = discoveryRequest.getProtocol();
Set<String> downServerCache = Sets.newHashSet();
for (int i = 0; i < RETRY_TIMES; i++) {
@ -97,15 +104,17 @@ public class ServerElectionService {
return address;
}
// 篡位本机作为Server
// 注意写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址仅在返回的时候特殊处理 (4.3.0 更改为 HTTP)
final String selfDefaultAddress = transportService.defaultProtocol().getAddress();
appInfo.setCurrentServer(selfDefaultAddress);
appInfo.setGmtModified(new Date());
// 篡位如果本机存在协议则作为Server调度该 worker
final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
if (targetProtocolInfo != null) {
// 注意写入 AppInfoDO#currentServer 的永远是 default 的地址仅在返回的时候特殊处理为协议地址
appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return selfDefaultAddress;
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return targetProtocolInfo.getAddress();
}
}catch (Exception e) {
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
}finally {
@ -142,10 +151,14 @@ public class ServerElectionService {
if (response.isSuccess()) {
log.info("[ServerElection] server[{}] is active, it will be the master.", serverAddress);
downServerCache.remove(serverAddress);
return serverAddress;
// 检测通过的是远程 server 的暴露地址需要返回 worker 需要的协议地址
final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);
if (protocolInfo != null) {
return protocolInfo.toJavaObject(ProtocolInfo.class).getAddress();
}
}
}catch (Exception e) {
log.warn("[ServerElection] server[{}] was down.", serverAddress);
log.warn("[ServerElection] server[{}] was down.", serverAddress, e);
}
downServerCache.add(serverAddress);
return null;

View File

@ -86,7 +86,7 @@ public class DesignateServerAspect {
}
// 目标IP与本地符合则本地执行
if (Objects.equals(targetServer, transportService.defaultProtocol())) {
if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) {
return point.proceed();
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.remote.transporter;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import tech.powerjob.remote.framework.transporter.Transporter;
@ -11,14 +12,15 @@ import tech.powerjob.remote.framework.transporter.Transporter;
* @since 2023/1/21
*/
@Getter
@Setter
@ToString
public class ProtocolInfo {
private final String protocol;
private String protocol;
private final String address;
private String address;
private final transient Transporter transporter;
private transient Transporter transporter;
public ProtocolInfo(String protocol, String address, Transporter transporter) {
this.protocol = protocol;

View File

@ -4,6 +4,7 @@ import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import java.util.Map;
import java.util.concurrent.CompletionStage;
/**
@ -22,6 +23,12 @@ public interface TransportService {
*/
ProtocolInfo defaultProtocol();
/**
* 当前支持的全部协议
* @return allProtocols
*/
Map<String, ProtocolInfo> allProtocols();
void tell(String protocol, URL url, PowerSerializable request);
<T> CompletionStage<T> ask(String protocol, URL url, PowerSerializable request, Class<T> clz) throws RemotingException;

View File

@ -50,7 +50,7 @@ public class PowerTransportService implements TransportService, InitializingBean
private final Environment environment;
private ProtocolInfo defaultProtocol;
private final Map<String, ProtocolInfo> protocol2Transporter = Maps.newHashMap();
private final Map<String, ProtocolInfo> protocolName2Info = Maps.newHashMap();
private final List<RemoteEngine> engines = Lists.newArrayList();
@ -65,10 +65,15 @@ public class PowerTransportService implements TransportService, InitializingBean
return defaultProtocol;
}
@Override
public Map<String, ProtocolInfo> allProtocols() {
return protocolName2Info;
}
private ProtocolInfo fetchProtocolInfo(String protocol) {
// 兼容老版 worker 未上报 protocol 的情况
protocol = compatibleProtocol(protocol);
final ProtocolInfo protocolInfo = protocol2Transporter.get(protocol);
final ProtocolInfo protocolInfo = protocolName2Info.get(protocol);
if (protocolInfo == null) {
throw new IllegalArgumentException("can't find Transporter by protocol :" + protocol);
}
@ -105,7 +110,7 @@ public class PowerTransportService implements TransportService, InitializingBean
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);
this.engines.add(re);
this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
}
@Override
@ -131,7 +136,7 @@ public class PowerTransportService implements TransportService, InitializingBean
choseDefault();
log.info("[PowerTransportService] initialize successfully!");
log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocol2Transporter);
log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocolName2Info);
}
/**
@ -165,7 +170,7 @@ public class PowerTransportService implements TransportService, InitializingBean
* HTTP 优先否则默认取第一个协议
*/
private void choseDefault() {
ProtocolInfo httpP = protocol2Transporter.get(Protocol.HTTP.name());
ProtocolInfo httpP = protocolName2Info.get(Protocol.HTTP.name());
if (httpP != null) {
log.info("[PowerTransportService] exist HTTP protocol, chose this as the default protocol!");
this.defaultProtocol = httpP;
@ -173,7 +178,7 @@ public class PowerTransportService implements TransportService, InitializingBean
}
String firstProtocol = activeProtocols.split(OmsConstant.COMMA)[0];
this.defaultProtocol = this.protocol2Transporter.get(firstProtocol);
this.defaultProtocol = this.protocolName2Info.get(firstProtocol);
log.info("[PowerTransportService] chose [{}] as the default protocol!", firstProtocol);
if (this.defaultProtocol == null) {

View File

@ -7,6 +7,7 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
@ -47,8 +48,8 @@ public class ServerController {
}
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(Long appId, String protocol, String currentServer) {
return ResultDTO.success(serverElectionService.elect(appId, protocol, currentServer));
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
return ResultDTO.success(serverElectionService.elect(request));
}
@GetMapping("/hello")

View File

@ -1,9 +1,12 @@
package tech.powerjob.worker.background;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
@ -37,7 +40,7 @@ public class ServerDiscoveryService {
/**
* 服务发现地址
*/
private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s&protocol=AKKA";
private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
/**
* 失败次数
*/
@ -131,7 +134,7 @@ public class ServerDiscoveryService {
@SuppressWarnings("rawtypes")
private String acquire(String httpServerAddress) {
String result = null;
String url = String.format(DISCOVERY_URL, httpServerAddress, appId, currentServerAddress);
String url = buildServerDiscoveryUrl(httpServerAddress);
try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) {
@ -147,4 +150,15 @@ public class ServerDiscoveryService {
}
return null;
}
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setCurrentServer(currentServerAddress)
.setProtocol(config.getProtocol().name().toUpperCase());
String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
return String.format(DISCOVERY_URL, address, query);
}
}