diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index b409347e..420de9c1 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -17,6 +17,7 @@ public class RemoteConstant { public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; + public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting"; public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf"; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index d173801f..41439727 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.akka; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.routing.RoundRobinPool; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.akka.actors.FriendActor; @@ -58,7 +59,9 @@ public class OhMyServer { Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ServerActor.class) + .withDispatcher("akka.server-actor-dispatcher") + .withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME); actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch); diff --git a/powerjob-server/src/main/resources/oms-server.akka.conf b/powerjob-server/src/main/resources/oms-server.akka.conf index fe6ab702..d7ff91c0 100644 --- a/powerjob-server/src/main/resources/oms-server.akka.conf +++ b/powerjob-server/src/main/resources/oms-server.akka.conf @@ -20,4 +20,24 @@ akka { canonical.port = 0 } } + + server-actor-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 + } } \ No newline at end of file diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 4175bfe7..0b763742 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -1,7 +1,10 @@ package com.github.kfcfans.powerjob.worker; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.DeadLetter; import akka.actor.Props; +import akka.routing.RoundRobinPool; import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; @@ -9,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor; import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor; import com.github.kfcfans.powerjob.worker.actors.WorkerActor; @@ -93,11 +97,20 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + int cores = Runtime.getRuntime().availableProcessors(); actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(TaskTrackerActor.class) + .withDispatcher("akka.task-tracker-dispatcher") + .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ProcessorTrackerActor.class) + .withDispatcher("akka.processor-tracker-dispatcher") + .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME); + // 处理系统中产生的异常情况 + ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME); + actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); + log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java new file mode 100644 index 00000000..3b731b40 --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.powerjob.worker.actors; + +import akka.actor.AbstractActor; +import akka.actor.DeadLetter; +import lombok.extern.slf4j.Slf4j; + +/** + * 处理系统异常的 Actor + * + * @author 朱八 + * @since 2020/7/16 + */ +@Slf4j +public class TroubleshootingActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeadLetter.class, this::onReceiveDeadLetter) + .build(); + } + + public void onReceiveDeadLetter(DeadLetter dl) { + log.warn("[IndianActor] receive DeadLetter: {}", dl); + } +} diff --git a/powerjob-worker/src/main/resources/oms-worker.akka.conf b/powerjob-worker/src/main/resources/oms-worker.akka.conf index 94bf50af..409eee9c 100644 --- a/powerjob-worker/src/main/resources/oms-worker.akka.conf +++ b/powerjob-worker/src/main/resources/oms-worker.akka.conf @@ -20,4 +20,36 @@ akka { canonical.port = 25520 } } + + # dispatcher + task-tracker-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 + } + + processor-tracker-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 4 + } + throughput = 100 + } } \ No newline at end of file