feat: akka remote impl

This commit is contained in:
tjq 2023-01-07 14:38:17 +08:00
parent b0b2c24571
commit 676388a988
5 changed files with 97 additions and 4 deletions

View File

@ -4,9 +4,13 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.base.Address;
@ -25,6 +29,7 @@ import java.util.Map;
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class AkkaCSInitializer implements CSInitializer {
private ActorSystem actorSystem;
@ -50,6 +55,8 @@ public class AkkaCSInitializer implements CSInitializer {
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
log.info("[PowerJob-AKKA] try to start AKKA System by config: {}", akkaFinalConfig);
// 启动时绑定当前的 actorSystemName
String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType(), true);
this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
@ -66,7 +73,18 @@ public class AkkaCSInitializer implements CSInitializer {
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
// TODO: 考虑如何优雅绑定实在不行就暴力绑定到一个 actor 反正可以切协议
int cores = Runtime.getRuntime().availableProcessors();
actorInfos.forEach(actorInfo -> {
String rootPath = actorInfo.getAnno().path();
AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);
log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));
actorSystem.actorOf(AkkaProxyActor.props(actorInfo)
.withDispatcher(actorConfig.getDispatcherName())
.withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());
});
}
@Override

View File

@ -0,0 +1,49 @@
package tech.powerjob.remote.akka;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import java.util.Map;
/**
* 构建 Actor Mapping
*
* @author tjq
* @since 2023/1/7
*/
public class AkkaMappingService {
/**
* Actor's RootPath -> Akka Actor Name
*/
private static final Map<String, ActorConfig> RP_2_ACTOR_CFG = Maps.newHashMap();
static {
// TODO: 迁移时写入规则
}
private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher";
/**
* 根据 actor rootPath 获取 Akka Actor Name不存在改写则使用当前路径
* @param actorRootPath actorRootPath
* @return actorName
*/
public static ActorConfig parseActorName(String actorRootPath) {
return RP_2_ACTOR_CFG.getOrDefault(actorRootPath,
new ActorConfig()
.setActorName(actorRootPath)
.setDispatcherName(DEFAULT_DISPATCH_NAME)
);
}
@Getter
@Setter
@Accessors(chain = true)
public static class ActorConfig {
private String actorName;
private String dispatcherName;
}
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.remote.akka;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
@ -24,6 +25,10 @@ public class AkkaProxyActor extends AbstractActor {
private final Receive receive;
private final ActorInfo actorInfo;
public static Props props(ActorInfo actorInfo) {
return Props.create(AkkaProxyActor.class, () -> new AkkaProxyActor(actorInfo));
}
public AkkaProxyActor(ActorInfo actorInfo) {
this.actorInfo = actorInfo;
final ReceiveBuilder receiveBuilder = receiveBuilder();
@ -36,7 +41,7 @@ public class AkkaProxyActor extends AbstractActor {
}
final Class<?> bindClz = powerSerializeClz.get();
receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo));
log.info("[PowerJobProxyActor] bind handler[{}] to [{}]", location, bindClz);
log.info("[PowerJob-AKKA] bind handler[{}] to [{}]", location, bindClz);
});
this.receive = receiveBuilder.build();
}
@ -54,7 +59,7 @@ public class AkkaProxyActor extends AbstractActor {
getSender().tell(ret, getSelf());
}
} catch (Exception e) {
log.error("[TaskTrackerProxyActor] process failed!", e);
log.error("[PowerJob-AKKA] process failed!", e);
}
}
}

View File

@ -20,6 +20,6 @@ public class AkkaTroubleshootingActor extends AbstractActor {
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl);
log.warn("[PowerJob-AKKA] receive DeadLetter: {}", dl);
}
}

View File

@ -109,4 +109,25 @@ akka {
# Set to 1 for as fair as possible.
throughput = 5
}
##################### default config #####################
common-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
}