diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java index b93e9206..3fc3b2a4 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java @@ -4,9 +4,13 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.DeadLetter; import akka.actor.Props; +import akka.routing.RoundRobinPool; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.actor.HandlerInfo; import tech.powerjob.remote.framework.base.Address; @@ -25,6 +29,7 @@ import java.util.Map; * @author tjq * @since 2022/12/31 */ +@Slf4j public class AkkaCSInitializer implements CSInitializer { private ActorSystem actorSystem; @@ -50,6 +55,8 @@ public class AkkaCSInitializer implements CSInitializer { Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + log.info("[PowerJob-AKKA] try to start AKKA System by config: {}", akkaFinalConfig); + // 启动时绑定当前的 actorSystemName String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType(), true); this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig); @@ -66,7 +73,18 @@ public class AkkaCSInitializer implements CSInitializer { @Override public void bindHandlers(List actorInfos) { - // TODO: 考虑如何优雅绑定(实在不行就暴力绑定到一个 actor 上,反正可以切协议) + int cores = Runtime.getRuntime().availableProcessors(); + actorInfos.forEach(actorInfo -> { + String rootPath = actorInfo.getAnno().path(); + AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath); + + log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig)); + + actorSystem.actorOf(AkkaProxyActor.props(actorInfo) + .withDispatcher(actorConfig.getDispatcherName()) + .withRouter(new RoundRobinPool(cores)), actorConfig.getActorName()); + + }); } @Override diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java new file mode 100644 index 00000000..ff115eeb --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java @@ -0,0 +1,49 @@ +package tech.powerjob.remote.akka; + +import com.google.common.collect.Maps; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +import java.util.Map; + +/** + * 构建 Actor Mapping + * + * @author tjq + * @since 2023/1/7 + */ +public class AkkaMappingService { + + /** + * Actor's RootPath -> Akka Actor Name + */ + private static final Map RP_2_ACTOR_CFG = Maps.newHashMap(); + + static { + // TODO: 迁移时写入规则 + } + + private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher"; + + /** + * 根据 actor 的 rootPath 获取 Akka Actor Name,不存在改写则使用当前路径 + * @param actorRootPath actorRootPath + * @return actorName + */ + public static ActorConfig parseActorName(String actorRootPath) { + return RP_2_ACTOR_CFG.getOrDefault(actorRootPath, + new ActorConfig() + .setActorName(actorRootPath) + .setDispatcherName(DEFAULT_DISPATCH_NAME) + ); + } + + @Getter + @Setter + @Accessors(chain = true) + public static class ActorConfig { + private String actorName; + private String dispatcherName; + } +} diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java index 70cf2e1c..15700cc0 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java @@ -1,6 +1,7 @@ package tech.powerjob.remote.akka; import akka.actor.AbstractActor; +import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.exception.PowerJobException; @@ -24,6 +25,10 @@ public class AkkaProxyActor extends AbstractActor { private final Receive receive; private final ActorInfo actorInfo; + public static Props props(ActorInfo actorInfo) { + return Props.create(AkkaProxyActor.class, () -> new AkkaProxyActor(actorInfo)); + } + public AkkaProxyActor(ActorInfo actorInfo) { this.actorInfo = actorInfo; final ReceiveBuilder receiveBuilder = receiveBuilder(); @@ -36,7 +41,7 @@ public class AkkaProxyActor extends AbstractActor { } final Class bindClz = powerSerializeClz.get(); receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo)); - log.info("[PowerJobProxyActor] bind handler[{}] to [{}]", location, bindClz); + log.info("[PowerJob-AKKA] bind handler[{}] to [{}]", location, bindClz); }); this.receive = receiveBuilder.build(); } @@ -54,7 +59,7 @@ public class AkkaProxyActor extends AbstractActor { getSender().tell(ret, getSelf()); } } catch (Exception e) { - log.error("[TaskTrackerProxyActor] process failed!", e); + log.error("[PowerJob-AKKA] process failed!", e); } } } diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java index bcec9c8a..80153648 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java @@ -20,6 +20,6 @@ public class AkkaTroubleshootingActor extends AbstractActor { } public void onReceiveDeadLetter(DeadLetter dl) { - log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl); + log.warn("[PowerJob-AKKA] receive DeadLetter: {}", dl); } } diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf b/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf index 34b04a4c..001a4177 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf @@ -109,4 +109,25 @@ akka { # Set to 1 for as fair as possible. throughput = 5 } + + ##################### default config ##################### + common-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 10 + } } \ No newline at end of file