feat: add remote akka impl

This commit is contained in:
tjq 2022-12-31 16:34:13 +08:00
parent 4356c5566d
commit c6d90be839
17 changed files with 451 additions and 1 deletions

View File

@ -11,6 +11,8 @@
<packaging>pom</packaging>
<modules>
<module>powerjob-remote-framework</module>
<module>powerjob-remote-impl-http</module>
<module>powerjob-remote-impl-akka</module>
</modules>
<artifactId>powerjob-remote</artifactId>

View File

@ -9,6 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<version>4.2.0</version>
<artifactId>powerjob-remote-framework</artifactId>
<properties>

View File

@ -18,4 +18,8 @@ import java.io.Serializable;
public class Address implements Serializable {
private String host;
private int port;
public String toFullAddress() {
return String.format("%s:%d", host, port);
}
}

View File

@ -0,0 +1,12 @@
package tech.powerjob.remote.framework.base;
/**
* 服务器类型类型
*
* @author tjq
* @since 2022/12/31
*/
public enum ServerType {
SERVER,
WORKER
}

View File

@ -1,5 +1,8 @@
package tech.powerjob.remote.framework.base;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
@ -8,6 +11,8 @@ import java.io.Serializable;
* @author tjq
* @since 2022/12/31
*/
@Data
@Accessors(chain = true)
public class URL implements Serializable {
/**
* remote address

View File

@ -4,6 +4,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable;
@ -19,4 +20,6 @@ import java.io.Serializable;
public class CSInitializerConfig implements Serializable {
private Address bindAddress;
private ServerType serverType;
}

View File

@ -5,6 +5,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable;
import java.util.Set;
@ -19,6 +20,7 @@ import java.util.Set;
@Accessors(chain = true)
public class EngineConfig implements Serializable {
private ServerType serverType;
/**
* 需要启动的引擎类型
*/

View File

@ -37,7 +37,10 @@ public class PowerJobRemoteEngine implements RemoteEngine {
Stopwatch sw = Stopwatch.createStarted();
log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type);
csInitializer.init(new CSInitializerConfig().setBindAddress(engineConfig.getBindAddress()));
csInitializer.init(new CSInitializerConfig()
.setBindAddress(engineConfig.getBindAddress())
.setServerType(engineConfig.getServerType())
);
Transporter transporter = csInitializer.buildTransporter();
engineOutput.getType2Transport().put(type, transporter);

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>3.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>4.2.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-remote-framework.version>4.2.0</powerjob-remote-framework.version>
<akka.version>2.6.12</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-framework</artifactId>
<version>${powerjob-remote-framework.version}</version>
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,75 @@
package tech.powerjob.remote.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* AkkaCSInitializer
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaCSInitializer implements CSInitializer {
private ActorSystem actorSystem;
private CSInitializerConfig config;
@Override
public String type() {
return AkkaConstant.PROTOCOL;
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
Address bindAddress = config.getBindAddress();
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了
Map<String, Object> overrideConfig = Maps.newHashMap();
overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
// 启动时绑定当前的 actorSystemName
String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType(), false);
this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
}
@Override
public Transporter buildTransporter() {
return new AkkaTransporter(config.getServerType(), actorSystem);
}
@Override
public void bindHandlers(List<HandlerInfo> handlerInfos) {
}
@Override
public void close() throws IOException {
actorSystem.terminate();
}
}

View File

@ -0,0 +1,36 @@
package tech.powerjob.remote.akka;
import tech.powerjob.remote.framework.base.ServerType;
/**
* AkkaConstant
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaConstant {
public static final String PROTOCOL = "AKKA";
public static final String AKKA_CONFIG = "powerjob.akka.conf";
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
/**
* 获取 actorSystem 名称
* @param serverType 当前服务器类型powerjob-server serverpowerjob-worker worker
* @param reversed 是否反向输出默认输出当前服务器对应的 actorSystemNamereversed = true 后倒置为目标服务器的 actorSystemName
* @return actorSystemName
*/
public static String fetchActorSystemName(ServerType serverType, boolean reversed) {
boolean outputServer = serverType == ServerType.SERVER;
if (reversed) {
outputServer = !outputServer;
}
return outputServer ? SERVER_ACTOR_SYSTEM_NAME : WORKER_ACTOR_SYSTEM_NAME;
}
}

View File

@ -0,0 +1,16 @@
package tech.powerjob.remote.akka;
import tech.powerjob.remote.framework.transporter.Protocol;
/**
* AkkaProtocol
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaProtocol implements Protocol {
@Override
public String name() {
return AkkaConstant.PROTOCOL;
}
}

View File

@ -0,0 +1,81 @@
package tech.powerjob.remote.akka;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import com.google.common.collect.Maps;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
/**
* AkkaTransporter
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaTransporter implements Transporter {
private final ServerType serverType;
private final ActorSystem actorSystem;
private final String targetActorSystemName;
/**
* akka://<actor system>@<hostname>:<port>/<actor path>
*/
private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
private static final Map<String, String> SERVER_PATH_MAP = Maps.newHashMap();
private static final Map<String, String> WORKER_PATH_MAP = Maps.newHashMap();
static {
SERVER_PATH_MAP.put("", "");
WORKER_PATH_MAP.put("", "");
}
public AkkaTransporter(ServerType serverType, ActorSystem actorSystem) {
this.actorSystem = actorSystem;
this.serverType = serverType;
this.targetActorSystemName = AkkaConstant.fetchActorSystemName(serverType, true);
}
@Override
public Protocol getProtocol() {
return new AkkaProtocol();
}
@Override
public void tell(URL url, PowerSerializable request) {
ActorSelection actorSelection = fetchActorSelection(url);
actorSelection.tell(request, null);
}
@Override
public CompletionStage<Object> ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException {
ActorSelection actorSelection = fetchActorSelection(url);
return Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
}
private ActorSelection fetchActorSelection(URL url) {
Map<String, String> rootPath2ActorNameMap = serverType == ServerType.SERVER ? SERVER_PATH_MAP : WORKER_PATH_MAP;
final String actorName = rootPath2ActorNameMap.get(url.getLocation().getRootPath());
CommonUtils.requireNonNull(actorName, "can't find actor by URL: " + url.getLocation());
String address = url.getAddress().toFullAddress();
return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, actorName));
}
}

View File

@ -0,0 +1,25 @@
package tech.powerjob.remote.akka;
import akka.actor.AbstractActor;
import akka.actor.DeadLetter;
import lombok.extern.slf4j.Slf4j;
/**
* TroubleshootingActor
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class AkkaTroubleshootingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeadLetter.class, this::onReceiveDeadLetter)
.build();
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl);
}
}

View File

@ -0,0 +1,8 @@
/**
* 由于 AKKA 后续转向收费运营模式PowerJob 计划移除 akka 支持因此不再维护该 module
* 如果存在任何使用上的问题请切换到其他通讯协议建议使用 netty
*
* @author PowerJob发言人
* @since 2022/12/31
*/
package tech.powerjob.remote.akka;

View File

@ -0,0 +1,112 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "WARNING"
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
allow-java-serialization = off
serializers {
power-serializer = "tech.powerjob.common.serialize.PowerAkkaSerializer"
}
serialization-bindings {
"tech.powerjob.common.PowerSerializable" = power-serializer
}
}
remote {
artery {
transport = tcp # See Selecting a transport below
# over write by code
canonical.hostname = "127.0.0.1"
canonical.port = 25520
}
}
# dispatcher
task-tracker-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
processor-tracker-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 64
}
throughput = 10
}
worker-common-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 8
}
throughput = 10
}
##################### server config #####################
# worker-request-core-dispatcher
w-r-c-d {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 128
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
friend-request-actor-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 128
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 5
}
}

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>3.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-http</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>