From 08711f93d004e4e4b9e05c22b396bde7c41af784 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Sun, 21 Aug 2022 00:11:17 +0800 Subject: [PATCH] perf: optimize akka config --- .../core/handler/WorkerRequestHandler.java | 50 +++++++++---------- .../impl/WorkerRequestAkkaHandler.java | 32 ++++++++++++ .../remote/server/FriendRequestHandler.java | 40 ++++++++++++++- .../remote/transport/starter/AkkaStarter.java | 42 ++++++++-------- .../src/main/resources/oms-server.akka.conf | 22 +++++++- 5 files changed, 136 insertions(+), 50 deletions(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java index 693ec232..ed5787a6 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java @@ -1,34 +1,32 @@ package tech.powerjob.server.core.handler; -import akka.actor.Props; -import akka.routing.RoundRobinPool; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.request.*; -import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler; -import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler; -import tech.powerjob.server.core.instance.InstanceLogService; -import tech.powerjob.server.core.instance.InstanceManager; -import tech.powerjob.server.core.workflow.WorkflowInstanceManager; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.remote.transport.starter.VertXStarter; -import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import tech.powerjob.server.common.module.WorkerInfo; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.utils.SpringUtils; -import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.remote.worker.WorkerClusterManagerService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.request.*; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.common.utils.SpringUtils; +import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler; +import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler; +import tech.powerjob.server.core.instance.InstanceLogService; +import tech.powerjob.server.core.instance.InstanceManager; +import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.transport.starter.AkkaStarter; +import tech.powerjob.server.remote.transport.starter.VertXStarter; +import tech.powerjob.server.remote.worker.WorkerClusterManagerService; +import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -66,9 +64,7 @@ public class WorkerRequestHandler { @PostConstruct public void initHandler() { // init akka - AkkaStarter.actorSystem.actorOf(Props.create(WorkerRequestAkkaHandler.class) - .withDispatcher("akka.server-actor-dispatcher") - .withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME); + AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME); // init vert.x VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java index 8dd02e43..99aa0cd3 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java @@ -1,6 +1,9 @@ package tech.powerjob.server.core.handler.impl; import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.routing.DefaultResizer; +import akka.routing.RoundRobinPool; import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; import lombok.extern.slf4j.Slf4j; @@ -18,6 +21,24 @@ import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRe @Slf4j public class WorkerRequestAkkaHandler extends AbstractActor { + + public static Props defaultProps(){ + return Props.create(WorkerRequestAkkaHandler.class) + .withDispatcher("akka.worker-request-actor-dispatcher") + .withRouter( + new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4) + .withResizer(new DefaultResizer( + Runtime.getRuntime().availableProcessors() * 4, + Runtime.getRuntime().availableProcessors() * 10, + 1, + 0.2d, + 0.3d, + 0.1d, + 10 + )) + ); + } + @Override public Receive createReceive() { return receiveBuilder() @@ -30,8 +51,19 @@ public class WorkerRequestAkkaHandler extends AbstractActor { .build(); } + @Override + public void preStart() throws Exception { + super.preStart(); + log.debug("[WorkerRequestAkkaHandler]init WorkerRequestActor"); + } + @Override + public void postStop() throws Exception { + super.postStop(); + log.debug("[WorkerRequestAkkaHandler]stop WorkerRequestActor"); + } + /** * 处理 instance 状态 * @param req 任务实例的状态上报请求 diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java index bbfeecf2..edda488e 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java @@ -1,14 +1,17 @@ package tech.powerjob.server.remote.server; import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.routing.DefaultResizer; +import akka.routing.RoundRobinPool; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.server.remote.server.election.Ping; import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; import tech.powerjob.server.remote.transport.TransportService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; /** * 处理朋友们的信息(处理服务器与服务器之间的通讯) @@ -18,6 +21,25 @@ import org.apache.commons.lang3.exception.ExceptionUtils; */ @Slf4j public class FriendRequestHandler extends AbstractActor { + + + public static Props defaultProps() { + return Props.create(FriendRequestHandler.class) + .withDispatcher("akka.friend-request-actor-dispatcher") + .withRouter( + new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4) + .withResizer(new DefaultResizer( + Runtime.getRuntime().availableProcessors() * 4, + Runtime.getRuntime().availableProcessors() * 10, + 1, + 0.2d, + 0.3d, + 0.1d, + 10 + )) + ); + } + @Override public Receive createReceive() { return receiveBuilder() @@ -27,6 +49,20 @@ public class FriendRequestHandler extends AbstractActor { .build(); } + + @Override + public void preStart() throws Exception { + super.preStart(); + log.debug("[FriendRequestHandler]init FriendRequestActor"); + } + + + @Override + public void postStop() throws Exception { + super.postStop(); + log.debug("[FriendRequestHandler]stop FriendRequestActor"); + } + /** * 处理存活检测的请求 */ diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java index b456e54b..de54e273 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java @@ -2,13 +2,6 @@ package tech.powerjob.server.remote.transport.starter; import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import akka.actor.Props; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.common.PowerJobServerConfigKey; -import tech.powerjob.server.common.utils.PropertyUtils; -import tech.powerjob.server.remote.server.FriendRequestHandler; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -16,6 +9,12 @@ import com.typesafe.config.ConfigFactory; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.server.common.PowerJobServerConfigKey; +import tech.powerjob.server.common.utils.PropertyUtils; +import tech.powerjob.server.remote.server.FriendRequestHandler; import java.util.Map; import java.util.Properties; @@ -44,33 +43,36 @@ public class AkkaStarter { // TimeUtils.check(); // 解析配置文件 + Config akkaFinalConfig = parseConfig(); + actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); + actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); + log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch); + } + + private static Config parseConfig() { Properties properties = PropertyUtils.getProperties(); int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT))); - String portFromJVM = System.getProperty(PowerJobServerConfigKey.AKKA_PORT); - if (StringUtils.isNotEmpty(portFromJVM)) { - log.info("[PowerJob] use port from jvm params: {}", portFromJVM); - port = Integer.parseInt(portFromJVM); + String portFromJvm = System.getProperty(PowerJobServerConfigKey.AKKA_PORT); + if (StringUtils.isNotEmpty(portFromJvm)) { + log.info("[PowerJob] use port from jvm params: {}", portFromJvm); + port = Integer.parseInt(portFromJvm); } // 启动 ActorSystem Map overrideConfig = Maps.newHashMap(); - String localIP = NetUtils.getLocalHost(); - overrideConfig.put("akka.remote.artery.canonical.hostname", localIP); + String localIp = NetUtils.getLocalHost(); + overrideConfig.put("akka.remote.artery.canonical.hostname", localIp); overrideConfig.put("akka.remote.artery.canonical.port", port); - actorSystemAddress = localIP + ":" + port; + actorSystemAddress = localIp + ":" + port; log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress); Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME); - Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); - actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - - actorSystem.actorOf(Props.create(FriendRequestHandler.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); - - log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch); + return ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); } /** * 获取 ServerActor 的 ActorSelection + * * @param address IP:port * @return ActorSelection */ diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf b/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf index 00afa479..7e2274cd 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf +++ b/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf @@ -25,7 +25,7 @@ akka { } } - server-actor-dispatcher { + worker-request-actor-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use @@ -44,4 +44,24 @@ akka { # Set to 1 for as fair as possible. throughput = 10 } + + friend-request-actor-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 128 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 5 + } } \ No newline at end of file