mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[dev] config akka dispatcher and router & add akka DeadLetter receiver
This commit is contained in:
parent
598b010250
commit
b53716bba4
@ -17,6 +17,7 @@ public class RemoteConstant {
|
|||||||
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
|
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
|
||||||
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
||||||
public static final String WORKER_ACTOR_NAME = "worker";
|
public static final String WORKER_ACTOR_NAME = "worker";
|
||||||
|
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
|
||||||
|
|
||||||
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
|
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.akka;
|
|||||||
import akka.actor.ActorSelection;
|
import akka.actor.ActorSelection;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
|
import akka.routing.RoundRobinPool;
|
||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||||
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
|
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
|
||||||
@ -58,7 +59,9 @@ public class OhMyServer {
|
|||||||
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
||||||
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
||||||
|
|
||||||
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
|
actorSystem.actorOf(Props.create(ServerActor.class)
|
||||||
|
.withDispatcher("akka.server-actor-dispatcher")
|
||||||
|
.withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME);
|
||||||
actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
|
actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
|
||||||
|
|
||||||
log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch);
|
log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch);
|
||||||
|
@ -20,4 +20,24 @@ akka {
|
|||||||
canonical.port = 0
|
canonical.port = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
server-actor-dispatcher {
|
||||||
|
# Dispatcher is the name of the event-based dispatcher
|
||||||
|
type = Dispatcher
|
||||||
|
# What kind of ExecutionService to use
|
||||||
|
executor = "fork-join-executor"
|
||||||
|
# Configuration for the fork join pool
|
||||||
|
fork-join-executor {
|
||||||
|
# Min number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-min = 2
|
||||||
|
# Parallelism (threads) ... ceil(available processors * factor)
|
||||||
|
parallelism-factor = 2.0
|
||||||
|
# Max number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-max = 10
|
||||||
|
}
|
||||||
|
# Throughput defines the maximum number of messages to be
|
||||||
|
# processed per actor before the thread jumps to the next actor.
|
||||||
|
# Set to 1 for as fair as possible.
|
||||||
|
throughput = 100
|
||||||
|
}
|
||||||
}
|
}
|
@ -1,7 +1,10 @@
|
|||||||
package com.github.kfcfans.powerjob.worker;
|
package com.github.kfcfans.powerjob.worker;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.DeadLetter;
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
|
import akka.routing.RoundRobinPool;
|
||||||
import com.github.kfcfans.powerjob.common.OmsException;
|
import com.github.kfcfans.powerjob.common.OmsException;
|
||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
||||||
@ -9,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
|
|||||||
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
|
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
|
||||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||||
|
import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor;
|
||||||
import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor;
|
import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor;
|
||||||
import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor;
|
import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor;
|
||||||
import com.github.kfcfans.powerjob.worker.actors.WorkerActor;
|
import com.github.kfcfans.powerjob.worker.actors.WorkerActor;
|
||||||
@ -93,11 +97,20 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
|
|||||||
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
|
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
|
||||||
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
||||||
|
|
||||||
|
int cores = Runtime.getRuntime().availableProcessors();
|
||||||
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
||||||
actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
actorSystem.actorOf(Props.create(TaskTrackerActor.class)
|
||||||
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
.withDispatcher("akka.task-tracker-dispatcher")
|
||||||
|
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
||||||
|
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)
|
||||||
|
.withDispatcher("akka.processor-tracker-dispatcher")
|
||||||
|
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||||
actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME);
|
actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME);
|
||||||
|
|
||||||
|
// 处理系统中产生的异常情况
|
||||||
|
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME);
|
||||||
|
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
|
||||||
|
|
||||||
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
|
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
|
||||||
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
|
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
|
||||||
|
|
||||||
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.github.kfcfans.powerjob.worker.actors;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import akka.actor.DeadLetter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理系统异常的 Actor
|
||||||
|
*
|
||||||
|
* @author 朱八
|
||||||
|
* @since 2020/7/16
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class TroubleshootingActor extends AbstractActor {
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder()
|
||||||
|
.match(DeadLetter.class, this::onReceiveDeadLetter)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onReceiveDeadLetter(DeadLetter dl) {
|
||||||
|
log.warn("[IndianActor] receive DeadLetter: {}", dl);
|
||||||
|
}
|
||||||
|
}
|
@ -20,4 +20,36 @@ akka {
|
|||||||
canonical.port = 25520
|
canonical.port = 25520
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# dispatcher
|
||||||
|
task-tracker-dispatcher {
|
||||||
|
# Dispatcher is the name of the event-based dispatcher
|
||||||
|
type = Dispatcher
|
||||||
|
# What kind of ExecutionService to use
|
||||||
|
executor = "fork-join-executor"
|
||||||
|
# Configuration for the fork join pool
|
||||||
|
fork-join-executor {
|
||||||
|
# Min number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-min = 2
|
||||||
|
# Parallelism (threads) ... ceil(available processors * factor)
|
||||||
|
parallelism-factor = 2.0
|
||||||
|
# Max number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-max = 10
|
||||||
|
}
|
||||||
|
# Throughput defines the maximum number of messages to be
|
||||||
|
# processed per actor before the thread jumps to the next actor.
|
||||||
|
# Set to 1 for as fair as possible.
|
||||||
|
throughput = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
processor-tracker-dispatcher {
|
||||||
|
type = Dispatcher
|
||||||
|
executor = "fork-join-executor"
|
||||||
|
fork-join-executor {
|
||||||
|
parallelism-min = 2
|
||||||
|
parallelism-factor = 2.0
|
||||||
|
parallelism-max = 4
|
||||||
|
}
|
||||||
|
throughput = 100
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user