diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml
index 4e18ce31..ee3e7b8b 100644
--- a/powerjob-remote/pom.xml
+++ b/powerjob-remote/pom.xml
@@ -11,6 +11,8 @@
pom
powerjob-remote-framework
+ powerjob-remote-impl-http
+ powerjob-remote-impl-akka
powerjob-remote
diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml
index 542fc0e5..146d671f 100644
--- a/powerjob-remote/powerjob-remote-framework/pom.xml
+++ b/powerjob-remote/powerjob-remote-framework/pom.xml
@@ -9,6 +9,7 @@
4.0.0
+ 4.2.0
powerjob-remote-framework
diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java
index 0ab57738..71edbc36 100644
--- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java
+++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java
@@ -18,4 +18,8 @@ import java.io.Serializable;
public class Address implements Serializable {
private String host;
private int port;
+
+ public String toFullAddress() {
+ return String.format("%s:%d", host, port);
+ }
}
diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java
new file mode 100644
index 00000000..2b57212f
--- /dev/null
+++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/ServerType.java
@@ -0,0 +1,12 @@
+package tech.powerjob.remote.framework.base;
+
+/**
+ * 服务器类型类型
+ *
+ * @author tjq
+ * @since 2022/12/31
+ */
+public enum ServerType {
+ SERVER,
+ WORKER
+}
diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java
index 98487800..24e71b64 100644
--- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java
+++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java
@@ -1,5 +1,8 @@
package tech.powerjob.remote.framework.base;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
import java.io.Serializable;
/**
@@ -8,6 +11,8 @@ import java.io.Serializable;
* @author tjq
* @since 2022/12/31
*/
+@Data
+@Accessors(chain = true)
public class URL implements Serializable {
/**
* remote address
diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java
index a48a9078..a64051fd 100644
--- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java
+++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializerConfig.java
@@ -4,6 +4,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.Address;
+import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable;
@@ -19,4 +20,6 @@ import java.io.Serializable;
public class CSInitializerConfig implements Serializable {
private Address bindAddress;
+
+ private ServerType serverType;
}
diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java
index 0e1efa05..4120f7ea 100644
--- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java
+++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java
@@ -5,6 +5,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.Address;
+import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable;
import java.util.Set;
@@ -19,6 +20,7 @@ import java.util.Set;
@Accessors(chain = true)
public class EngineConfig implements Serializable {
+ private ServerType serverType;
/**
* 需要启动的引擎类型
*/
diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java
index 9e890265..1ee8cc44 100644
--- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java
+++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java
@@ -37,7 +37,10 @@ public class PowerJobRemoteEngine implements RemoteEngine {
Stopwatch sw = Stopwatch.createStarted();
log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type);
- csInitializer.init(new CSInitializerConfig().setBindAddress(engineConfig.getBindAddress()));
+ csInitializer.init(new CSInitializerConfig()
+ .setBindAddress(engineConfig.getBindAddress())
+ .setServerType(engineConfig.getServerType())
+ );
Transporter transporter = csInitializer.buildTransporter();
engineOutput.getType2Transport().put(type, transporter);
diff --git a/powerjob-remote/powerjob-remote-impl-akka/pom.xml b/powerjob-remote/powerjob-remote-impl-akka/pom.xml
new file mode 100644
index 00000000..e422b0c8
--- /dev/null
+++ b/powerjob-remote/powerjob-remote-impl-akka/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ powerjob-remote
+ tech.powerjob
+ 3.0.0
+
+ 4.0.0
+
+ powerjob-remote-impl-akka
+ 4.2.0
+
+
+ 8
+ 8
+ UTF-8
+
+ 4.2.0
+
+ 2.6.12
+
+
+
+
+ tech.powerjob
+ powerjob-remote-framework
+ ${powerjob-remote-framework.version}
+
+
+
+
+ com.typesafe.akka
+ akka-remote_2.13
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-slf4j_2.13
+ ${akka.version}
+
+
+
+
\ No newline at end of file
diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java
new file mode 100644
index 00000000..ae6e201c
--- /dev/null
+++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaCSInitializer.java
@@ -0,0 +1,75 @@
+package tech.powerjob.remote.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.DeadLetter;
+import akka.actor.Props;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import tech.powerjob.remote.framework.actor.HandlerInfo;
+import tech.powerjob.remote.framework.base.Address;
+import tech.powerjob.remote.framework.base.ServerType;
+import tech.powerjob.remote.framework.cs.CSInitializer;
+import tech.powerjob.remote.framework.cs.CSInitializerConfig;
+import tech.powerjob.remote.framework.transporter.Transporter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AkkaCSInitializer
+ *
+ * @author tjq
+ * @since 2022/12/31
+ */
+public class AkkaCSInitializer implements CSInitializer {
+
+ private ActorSystem actorSystem;
+ private CSInitializerConfig config;
+
+ @Override
+ public String type() {
+ return AkkaConstant.PROTOCOL;
+ }
+
+ @Override
+ public void init(CSInitializerConfig config) {
+
+ this.config = config;
+
+ Address bindAddress = config.getBindAddress();
+
+ // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了)
+ Map overrideConfig = Maps.newHashMap();
+ overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
+ overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
+
+ Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
+ Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
+
+ // 启动时绑定当前的 actorSystemName
+ String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType(), false);
+ this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
+
+ // 处理系统中产生的异常情况
+ ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");
+ actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
+ }
+
+ @Override
+ public Transporter buildTransporter() {
+ return new AkkaTransporter(config.getServerType(), actorSystem);
+ }
+
+ @Override
+ public void bindHandlers(List handlerInfos) {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ actorSystem.terminate();
+ }
+}
diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java
new file mode 100644
index 00000000..425f6981
--- /dev/null
+++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaConstant.java
@@ -0,0 +1,36 @@
+package tech.powerjob.remote.akka;
+
+import tech.powerjob.remote.framework.base.ServerType;
+
+/**
+ * AkkaConstant
+ *
+ * @author tjq
+ * @since 2022/12/31
+ */
+public class AkkaConstant {
+
+ public static final String PROTOCOL = "AKKA";
+
+ public static final String AKKA_CONFIG = "powerjob.akka.conf";
+
+ public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
+ public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
+
+ /**
+ * 获取 actorSystem 名称
+ * @param serverType 当前服务器类型,powerjob-server 为 server,powerjob-worker 为 worker
+ * @param reversed 是否反向输出,默认输出当前服务器对应的 actorSystemName,reversed = true 后倒置为目标服务器的 actorSystemName
+ * @return actorSystemName
+ */
+ public static String fetchActorSystemName(ServerType serverType, boolean reversed) {
+
+ boolean outputServer = serverType == ServerType.SERVER;
+ if (reversed) {
+ outputServer = !outputServer;
+ }
+
+ return outputServer ? SERVER_ACTOR_SYSTEM_NAME : WORKER_ACTOR_SYSTEM_NAME;
+ }
+
+}
diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java
new file mode 100644
index 00000000..e2e04c0b
--- /dev/null
+++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProtocol.java
@@ -0,0 +1,16 @@
+package tech.powerjob.remote.akka;
+
+import tech.powerjob.remote.framework.transporter.Protocol;
+
+/**
+ * AkkaProtocol
+ *
+ * @author tjq
+ * @since 2022/12/31
+ */
+public class AkkaProtocol implements Protocol {
+ @Override
+ public String name() {
+ return AkkaConstant.PROTOCOL;
+ }
+}
diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java
new file mode 100644
index 00000000..b03358f8
--- /dev/null
+++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java
@@ -0,0 +1,81 @@
+package tech.powerjob.remote.akka;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import com.google.common.collect.Maps;
+import tech.powerjob.common.PowerSerializable;
+import tech.powerjob.common.RemoteConstant;
+import tech.powerjob.common.utils.CommonUtils;
+import tech.powerjob.remote.framework.base.RemotingException;
+import tech.powerjob.remote.framework.base.ServerType;
+import tech.powerjob.remote.framework.base.URL;
+import tech.powerjob.remote.framework.transporter.Protocol;
+import tech.powerjob.remote.framework.transporter.Transporter;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * AkkaTransporter
+ *
+ * @author tjq
+ * @since 2022/12/31
+ */
+public class AkkaTransporter implements Transporter {
+
+ private final ServerType serverType;
+ private final ActorSystem actorSystem;
+
+ private final String targetActorSystemName;
+
+ /**
+ * akka://@:/
+ */
+ private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
+
+ private static final Map SERVER_PATH_MAP = Maps.newHashMap();
+ private static final Map WORKER_PATH_MAP = Maps.newHashMap();
+
+ static {
+ SERVER_PATH_MAP.put("", "");
+
+ WORKER_PATH_MAP.put("", "");
+ }
+
+ public AkkaTransporter(ServerType serverType, ActorSystem actorSystem) {
+ this.actorSystem = actorSystem;
+ this.serverType = serverType;
+ this.targetActorSystemName = AkkaConstant.fetchActorSystemName(serverType, true);
+ }
+
+ @Override
+ public Protocol getProtocol() {
+ return new AkkaProtocol();
+ }
+
+ @Override
+ public void tell(URL url, PowerSerializable request) {
+ ActorSelection actorSelection = fetchActorSelection(url);
+ actorSelection.tell(request, null);
+ }
+
+ @Override
+ public CompletionStage