perf: optimize akka config

This commit is contained in:
Echo009 2022-08-21 00:11:17 +08:00
parent 5ed6eac38a
commit 08711f93d0
5 changed files with 136 additions and 50 deletions

View File

@ -1,34 +1,32 @@
package tech.powerjob.server.core.handler; package tech.powerjob.server.core.handler;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.request.*;
import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler;
import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler;
import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -66,9 +64,7 @@ public class WorkerRequestHandler {
@PostConstruct @PostConstruct
public void initHandler() { public void initHandler() {
// init akka // init akka
AkkaStarter.actorSystem.actorOf(Props.create(WorkerRequestAkkaHandler.class) AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
.withDispatcher("akka.server-actor-dispatcher")
.withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME);
// init vert.x // init vert.x
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
} }

View File

@ -1,6 +1,9 @@
package tech.powerjob.server.core.handler.impl; package tech.powerjob.server.core.handler.impl;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.routing.DefaultResizer;
import akka.routing.RoundRobinPool;
import tech.powerjob.common.request.*; import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.AskResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -18,6 +21,24 @@ import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRe
@Slf4j @Slf4j
public class WorkerRequestAkkaHandler extends AbstractActor { public class WorkerRequestAkkaHandler extends AbstractActor {
public static Props defaultProps(){
return Props.create(WorkerRequestAkkaHandler.class)
.withDispatcher("akka.worker-request-actor-dispatcher")
.withRouter(
new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)
.withResizer(new DefaultResizer(
Runtime.getRuntime().availableProcessors() * 4,
Runtime.getRuntime().availableProcessors() * 10,
1,
0.2d,
0.3d,
0.1d,
10
))
);
}
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -30,8 +51,19 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
.build(); .build();
} }
@Override
public void preStart() throws Exception {
super.preStart();
log.debug("[WorkerRequestAkkaHandler]init WorkerRequestActor");
}
@Override
public void postStop() throws Exception {
super.postStop();
log.debug("[WorkerRequestAkkaHandler]stop WorkerRequestActor");
}
/** /**
* 处理 instance 状态 * 处理 instance 状态
* @param req 任务实例的状态上报请求 * @param req 任务实例的状态上报请求

View File

@ -1,14 +1,17 @@
package tech.powerjob.server.remote.server; package tech.powerjob.server.remote.server;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.routing.DefaultResizer;
import akka.routing.RoundRobinPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.remote.server.election.Ping; import tech.powerjob.server.remote.server.election.Ping;
import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; import tech.powerjob.server.remote.server.redirector.RemoteProcessReq;
import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor;
import tech.powerjob.server.remote.transport.TransportService; import tech.powerjob.server.remote.transport.TransportService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
/** /**
* 处理朋友们的信息处理服务器与服务器之间的通讯 * 处理朋友们的信息处理服务器与服务器之间的通讯
@ -18,6 +21,25 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
*/ */
@Slf4j @Slf4j
public class FriendRequestHandler extends AbstractActor { public class FriendRequestHandler extends AbstractActor {
public static Props defaultProps() {
return Props.create(FriendRequestHandler.class)
.withDispatcher("akka.friend-request-actor-dispatcher")
.withRouter(
new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)
.withResizer(new DefaultResizer(
Runtime.getRuntime().availableProcessors() * 4,
Runtime.getRuntime().availableProcessors() * 10,
1,
0.2d,
0.3d,
0.1d,
10
))
);
}
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -27,6 +49,20 @@ public class FriendRequestHandler extends AbstractActor {
.build(); .build();
} }
@Override
public void preStart() throws Exception {
super.preStart();
log.debug("[FriendRequestHandler]init FriendRequestActor");
}
@Override
public void postStop() throws Exception {
super.postStop();
log.debug("[FriendRequestHandler]stop FriendRequestActor");
}
/** /**
* 处理存活检测的请求 * 处理存活检测的请求
*/ */

View File

@ -2,13 +2,6 @@ package tech.powerjob.server.remote.transport.starter;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.utils.PropertyUtils;
import tech.powerjob.server.remote.server.FriendRequestHandler;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.typesafe.config.Config; import com.typesafe.config.Config;
@ -16,6 +9,12 @@ import com.typesafe.config.ConfigFactory;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.utils.PropertyUtils;
import tech.powerjob.server.remote.server.FriendRequestHandler;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -44,33 +43,36 @@ public class AkkaStarter {
// TimeUtils.check(); // TimeUtils.check();
// 解析配置文件 // 解析配置文件
Config akkaFinalConfig = parseConfig();
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
}
private static Config parseConfig() {
Properties properties = PropertyUtils.getProperties(); Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT))); int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT)));
String portFromJVM = System.getProperty(PowerJobServerConfigKey.AKKA_PORT); String portFromJvm = System.getProperty(PowerJobServerConfigKey.AKKA_PORT);
if (StringUtils.isNotEmpty(portFromJVM)) { if (StringUtils.isNotEmpty(portFromJvm)) {
log.info("[PowerJob] use port from jvm params: {}", portFromJVM); log.info("[PowerJob] use port from jvm params: {}", portFromJvm);
port = Integer.parseInt(portFromJVM); port = Integer.parseInt(portFromJvm);
} }
// 启动 ActorSystem // 启动 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap(); Map<String, Object> overrideConfig = Maps.newHashMap();
String localIP = NetUtils.getLocalHost(); String localIp = NetUtils.getLocalHost();
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP); overrideConfig.put("akka.remote.artery.canonical.hostname", localIp);
overrideConfig.put("akka.remote.artery.canonical.port", port); overrideConfig.put("akka.remote.artery.canonical.port", port);
actorSystemAddress = localIP + ":" + port; actorSystemAddress = localIp + ":" + port;
log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress); log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME); Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); return ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(FriendRequestHandler.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
} }
/** /**
* 获取 ServerActor ActorSelection * 获取 ServerActor ActorSelection
*
* @param address IP:port * @param address IP:port
* @return ActorSelection * @return ActorSelection
*/ */

View File

@ -25,7 +25,7 @@ akka {
} }
} }
server-actor-dispatcher { worker-request-actor-dispatcher {
# Dispatcher is the name of the event-based dispatcher # Dispatcher is the name of the event-based dispatcher
type = Dispatcher type = Dispatcher
# What kind of ExecutionService to use # What kind of ExecutionService to use
@ -44,4 +44,24 @@ akka {
# Set to 1 for as fair as possible. # Set to 1 for as fair as possible.
throughput = 10 throughput = 10
} }
friend-request-actor-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 128
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 5
}
} }