feat: HandlerLocation use serverType

This commit is contained in:
tjq 2023-01-20 09:00:52 +08:00
parent 3d5a5ac342
commit 38d6b16c74
4 changed files with 8 additions and 15 deletions

View File

@ -27,9 +27,9 @@ public class HandlerLocation implements Serializable {
*/ */
private String methodPath; private String methodPath;
/** /**
* 是否在本集群内用于兼容 AKKA 等除了IP还需要指定 system 访问的情况 * 调用的集群类型用于兼容 AKKA 等除了IP还需要指定 system 访问的情况
*/ */
private boolean insideCluster; private ServerType serverType;
public String toPath() { public String toPath() {
return String.format("/%s/%s", rootPath, methodPath); return String.format("/%s/%s", rootPath, methodPath);

View File

@ -58,7 +58,7 @@ public class AkkaCSInitializer implements CSInitializer {
log.info("[PowerJob-AKKA] try to start AKKA System by config: {}", akkaFinalConfig); log.info("[PowerJob-AKKA] try to start AKKA System by config: {}", akkaFinalConfig);
// 启动时绑定当前的 actorSystemName // 启动时绑定当前的 actorSystemName
String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType(), true); String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());
this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig); this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
// 处理系统中产生的异常情况 // 处理系统中产生的异常情况
@ -70,7 +70,7 @@ public class AkkaCSInitializer implements CSInitializer {
@Override @Override
public Transporter buildTransporter() { public Transporter buildTransporter() {
return new AkkaTransporter(config.getServerType(), actorSystem); return new AkkaTransporter(actorSystem);
} }
@Override @Override

View File

@ -18,17 +18,12 @@ public class AkkaConstant {
/** /**
* 获取 actorSystem 名称 * 获取 actorSystem 名称
* @param serverType 当前服务器类型powerjob-server serverpowerjob-worker worker * @param serverType 当前服务器类型powerjob-server serverpowerjob-worker worker
* @param mine 是否输出当前服务器对应的 actorSystemNamemine = false 后倒置为目标服务器的 actorSystemName
* @return actorSystemName * @return actorSystemName
*/ */
public static String fetchActorSystemName(ServerType serverType, boolean mine) { public static String fetchActorSystemName(ServerType serverType) {
boolean outputServer = serverType == ServerType.SERVER;
if (!mine) {
outputServer = !outputServer;
}
return outputServer ? SERVER_ACTOR_SYSTEM_NAME : WORKER_ACTOR_SYSTEM_NAME; return serverType == ServerType.SERVER ? SERVER_ACTOR_SYSTEM_NAME : WORKER_ACTOR_SYSTEM_NAME;
} }
} }

View File

@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService;
*/ */
public class AkkaTransporter implements Transporter { public class AkkaTransporter implements Transporter {
private final ServerType serverType;
private final ActorSystem actorSystem; private final ActorSystem actorSystem;
@ -37,9 +36,8 @@ public class AkkaTransporter implements Transporter {
*/ */
private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s"; private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
public AkkaTransporter(ServerType serverType, ActorSystem actorSystem) { public AkkaTransporter(ActorSystem actorSystem) {
this.actorSystem = actorSystem; this.actorSystem = actorSystem;
this.serverType = serverType;
} }
@Override @Override
@ -63,7 +61,7 @@ public class AkkaTransporter implements Transporter {
private ActorSelection fetchActorSelection(URL url) { private ActorSelection fetchActorSelection(URL url) {
HandlerLocation location = url.getLocation(); HandlerLocation location = url.getLocation();
String targetActorSystemName = AkkaConstant.fetchActorSystemName(serverType, location.isInsideCluster()); String targetActorSystemName = AkkaConstant.fetchActorSystemName(location.getServerType());
String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName(); String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();