From c6d90be839ee25b8bd2a695683620b92d017d883 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 31 Dec 2022 16:34:13 +0800 Subject: [PATCH] feat: add remote akka impl --- powerjob-remote/pom.xml | 2 + .../powerjob-remote-framework/pom.xml | 1 + .../remote/framework/base/Address.java | 4 + .../remote/framework/base/ServerType.java | 12 ++ .../powerjob/remote/framework/base/URL.java | 5 + .../framework/cs/CSInitializerConfig.java | 3 + .../remote/framework/engine/EngineConfig.java | 2 + .../engine/impl/PowerJobRemoteEngine.java | 5 +- .../powerjob-remote-impl-akka/pom.xml | 45 +++++++ .../remote/akka/AkkaCSInitializer.java | 75 ++++++++++++ .../powerjob/remote/akka/AkkaConstant.java | 36 ++++++ .../powerjob/remote/akka/AkkaProtocol.java | 16 +++ .../powerjob/remote/akka/AkkaTransporter.java | 81 +++++++++++++ .../remote/akka/AkkaTroubleshootingActor.java | 25 ++++ .../powerjob/remote/akka/package-info.java | 8 ++ .../src/main/resources/powerjob.akka.conf | 112 ++++++++++++++++++ .../powerjob-remote-impl-http/pom.xml | 20 ++++ 17 files changed, 451 insertions(+), 1 deletion(-) create mode 100644 powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/pom.xml create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/package-info.java create mode 100644 powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf create mode 100644 powerjob-remote/powerjob-remote-impl-http/pom.xml diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml index 4e18ce31..ee3e7b8b 100644 --- a/powerjob-remote/pom.xml +++ b/powerjob-remote/pom.xml @@ -11,6 +11,8 @@ pom powerjob-remote-framework + powerjob-remote-impl-http + powerjob-remote-impl-akka powerjob-remote diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml index 542fc0e5..146d671f 100644 --- a/powerjob-remote/powerjob-remote-framework/pom.xml +++ b/powerjob-remote/powerjob-remote-framework/pom.xml @@ -9,6 +9,7 @@ 4.0.0 + 4.2.0 powerjob-remote-framework diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java index 0ab57738..71edbc36 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java @@ -18,4 +18,8 @@ import java.io.Serializable; public class Address implements Serializable { private String host; private int port; + + public String toFullAddress() { + return String.format("%s:%d", host, port); + } } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java new file mode 100644 index 00000000..2b57212f --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java @@ -0,0 +1,12 @@ +package tech.powerjob.remote.framework.base; + +/** + * 服务器类型类型 + * + * @author tjq + * @since 2022/12/31 + */ +public enum ServerType { + SERVER, + WORKER +} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java index 98487800..24e71b64 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java @@ -1,5 +1,8 @@ package tech.powerjob.remote.framework.base; +import lombok.Data; +import lombok.experimental.Accessors; + import java.io.Serializable; /** @@ -8,6 +11,8 @@ import java.io.Serializable; * @author tjq * @since 2022/12/31 */ +@Data +@Accessors(chain = true) public class URL implements Serializable { /** * remote address diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java index a48a9078..a64051fd 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java @@ -4,6 +4,7 @@ import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.ServerType; import java.io.Serializable; @@ -19,4 +20,6 @@ import java.io.Serializable; public class CSInitializerConfig implements Serializable { private Address bindAddress; + + private ServerType serverType; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java index 0e1efa05..4120f7ea 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java @@ -5,6 +5,7 @@ import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.ServerType; import java.io.Serializable; import java.util.Set; @@ -19,6 +20,7 @@ import java.util.Set; @Accessors(chain = true) public class EngineConfig implements Serializable { + private ServerType serverType; /** * 需要启动的引擎类型 */ diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index 9e890265..1ee8cc44 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -37,7 +37,10 @@ public class PowerJobRemoteEngine implements RemoteEngine { Stopwatch sw = Stopwatch.createStarted(); log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type); - csInitializer.init(new CSInitializerConfig().setBindAddress(engineConfig.getBindAddress())); + csInitializer.init(new CSInitializerConfig() + .setBindAddress(engineConfig.getBindAddress()) + .setServerType(engineConfig.getServerType()) + ); Transporter transporter = csInitializer.buildTransporter(); engineOutput.getType2Transport().put(type, transporter); diff --git a/powerjob-remote/powerjob-remote-impl-akka/pom.xml b/powerjob-remote/powerjob-remote-impl-akka/pom.xml new file mode 100644 index 00000000..e422b0c8 --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/pom.xml @@ -0,0 +1,45 @@ + + + + powerjob-remote + tech.powerjob + 3.0.0 + + 4.0.0 + + powerjob-remote-impl-akka + 4.2.0 + + + 8 + 8 + UTF-8 + + 4.2.0 + + 2.6.12 + + + + + tech.powerjob + powerjob-remote-framework + ${powerjob-remote-framework.version} + + + + + com.typesafe.akka + akka-remote_2.13 + ${akka.version} + + + com.typesafe.akka + akka-slf4j_2.13 + ${akka.version} + + + + \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java new file mode 100644 index 00000000..ae6e201c --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java @@ -0,0 +1,75 @@ +package tech.powerjob.remote.akka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.DeadLetter; +import akka.actor.Props; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import tech.powerjob.remote.framework.actor.HandlerInfo; +import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.cs.CSInitializer; +import tech.powerjob.remote.framework.cs.CSInitializerConfig; +import tech.powerjob.remote.framework.transporter.Transporter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * AkkaCSInitializer + * + * @author tjq + * @since 2022/12/31 + */ +public class AkkaCSInitializer implements CSInitializer { + + private ActorSystem actorSystem; + private CSInitializerConfig config; + + @Override + public String type() { + return AkkaConstant.PROTOCOL; + } + + @Override + public void init(CSInitializerConfig config) { + + this.config = config; + + Address bindAddress = config.getBindAddress(); + + // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了) + Map overrideConfig = Maps.newHashMap(); + overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost()); + overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort()); + + Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG); + Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + + // 启动时绑定当前的 actorSystemName + String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType(), false); + this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig); + + // 处理系统中产生的异常情况 + ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting"); + actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); + } + + @Override + public Transporter buildTransporter() { + return new AkkaTransporter(config.getServerType(), actorSystem); + } + + @Override + public void bindHandlers(List handlerInfos) { + + } + + @Override + public void close() throws IOException { + actorSystem.terminate(); + } +} diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java new file mode 100644 index 00000000..425f6981 --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java @@ -0,0 +1,36 @@ +package tech.powerjob.remote.akka; + +import tech.powerjob.remote.framework.base.ServerType; + +/** + * AkkaConstant + * + * @author tjq + * @since 2022/12/31 + */ +public class AkkaConstant { + + public static final String PROTOCOL = "AKKA"; + + public static final String AKKA_CONFIG = "powerjob.akka.conf"; + + public static final String WORKER_ACTOR_SYSTEM_NAME = "oms"; + public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server"; + + /** + * 获取 actorSystem 名称 + * @param serverType 当前服务器类型,powerjob-server 为 server,powerjob-worker 为 worker + * @param reversed 是否反向输出,默认输出当前服务器对应的 actorSystemName,reversed = true 后倒置为目标服务器的 actorSystemName + * @return actorSystemName + */ + public static String fetchActorSystemName(ServerType serverType, boolean reversed) { + + boolean outputServer = serverType == ServerType.SERVER; + if (reversed) { + outputServer = !outputServer; + } + + return outputServer ? SERVER_ACTOR_SYSTEM_NAME : WORKER_ACTOR_SYSTEM_NAME; + } + +} diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java new file mode 100644 index 00000000..e2e04c0b --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java @@ -0,0 +1,16 @@ +package tech.powerjob.remote.akka; + +import tech.powerjob.remote.framework.transporter.Protocol; + +/** + * AkkaProtocol + * + * @author tjq + * @since 2022/12/31 + */ +public class AkkaProtocol implements Protocol { + @Override + public String name() { + return AkkaConstant.PROTOCOL; + } +} diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java new file mode 100644 index 00000000..b03358f8 --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java @@ -0,0 +1,81 @@ +package tech.powerjob.remote.akka; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import com.google.common.collect.Maps; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.transporter.Protocol; +import tech.powerjob.remote.framework.transporter.Transporter; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; + +/** + * AkkaTransporter + * + * @author tjq + * @since 2022/12/31 + */ +public class AkkaTransporter implements Transporter { + + private final ServerType serverType; + private final ActorSystem actorSystem; + + private final String targetActorSystemName; + + /** + * akka://@:/ + */ + private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s"; + + private static final Map SERVER_PATH_MAP = Maps.newHashMap(); + private static final Map WORKER_PATH_MAP = Maps.newHashMap(); + + static { + SERVER_PATH_MAP.put("", ""); + + WORKER_PATH_MAP.put("", ""); + } + + public AkkaTransporter(ServerType serverType, ActorSystem actorSystem) { + this.actorSystem = actorSystem; + this.serverType = serverType; + this.targetActorSystemName = AkkaConstant.fetchActorSystemName(serverType, true); + } + + @Override + public Protocol getProtocol() { + return new AkkaProtocol(); + } + + @Override + public void tell(URL url, PowerSerializable request) { + ActorSelection actorSelection = fetchActorSelection(url); + actorSelection.tell(request, null); + } + + @Override + public CompletionStage ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException { + ActorSelection actorSelection = fetchActorSelection(url); + return Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + } + + private ActorSelection fetchActorSelection(URL url) { + + Map rootPath2ActorNameMap = serverType == ServerType.SERVER ? SERVER_PATH_MAP : WORKER_PATH_MAP; + final String actorName = rootPath2ActorNameMap.get(url.getLocation().getRootPath()); + CommonUtils.requireNonNull(actorName, "can't find actor by URL: " + url.getLocation()); + + String address = url.getAddress().toFullAddress(); + + return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, actorName)); + } +} diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java new file mode 100644 index 00000000..bcec9c8a --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTroubleshootingActor.java @@ -0,0 +1,25 @@ +package tech.powerjob.remote.akka; + +import akka.actor.AbstractActor; +import akka.actor.DeadLetter; +import lombok.extern.slf4j.Slf4j; + +/** + * TroubleshootingActor + * + * @author tjq + * @since 2022/12/31 + */ +@Slf4j +public class AkkaTroubleshootingActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeadLetter.class, this::onReceiveDeadLetter) + .build(); + } + + public void onReceiveDeadLetter(DeadLetter dl) { + log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl); + } +} diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/package-info.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/package-info.java new file mode 100644 index 00000000..c01678db --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/package-info.java @@ -0,0 +1,8 @@ +/** + * 由于 AKKA 后续转向收费运营模式,PowerJob 计划移除 akka 支持,因此不再维护该 module。 + * 如果存在任何使用上的问题,请切换到其他通讯协议(建议使用 netty) + * + * @author PowerJob发言人 + * @since 2022/12/31 + */ +package tech.powerjob.remote.akka; \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf b/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf new file mode 100644 index 00000000..34b04a4c --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/resources/powerjob.akka.conf @@ -0,0 +1,112 @@ +akka { + + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "WARNING" + + actor { + # cluster is better(recommend by official document), but I prefer remote + provider = remote + allow-java-serialization = off + + serializers { + power-serializer = "tech.powerjob.common.serialize.PowerAkkaSerializer" + } + + serialization-bindings { + "tech.powerjob.common.PowerSerializable" = power-serializer + } + } + remote { + artery { + transport = tcp # See Selecting a transport below + # over write by code + canonical.hostname = "127.0.0.1" + canonical.port = 25520 + } + } + + # dispatcher + task-tracker-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 10 + } + + processor-tracker-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 64 + } + throughput = 10 + } + + worker-common-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 8 + } + throughput = 10 + } + + ##################### server config ##################### + # worker-request-core-dispatcher + w-r-c-d { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 128 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 10 + } + + friend-request-actor-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 128 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 5 + } +} \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-impl-http/pom.xml b/powerjob-remote/powerjob-remote-impl-http/pom.xml new file mode 100644 index 00000000..621bbf4d --- /dev/null +++ b/powerjob-remote/powerjob-remote-impl-http/pom.xml @@ -0,0 +1,20 @@ + + + + powerjob-remote + tech.powerjob + 3.0.0 + + 4.0.0 + + powerjob-remote-impl-http + + + 8 + 8 + UTF-8 + + + \ No newline at end of file