diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ActorInfo.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ActorInfo.java index 23c30849..d139dc73 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ActorInfo.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/ActorInfo.java @@ -18,7 +18,7 @@ import java.util.List; @Accessors(chain = true) public class ActorInfo { - private PowerJobActor actor; + private Object actor; private Actor anno; diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/PowerJobActor.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/PowerJobActor.java deleted file mode 100644 index 3d0b757b..00000000 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/actor/PowerJobActor.java +++ /dev/null @@ -1,10 +0,0 @@ -package tech.powerjob.remote.framework.actor; - -/** - * 方便后续扩展,比如启动回调等 - * - * @author tjq - * @since 2023/1/6 - */ -public interface PowerJobActor { -} diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializer.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializer.java index 723379a6..a5e140d9 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializer.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/cs/CSInitializer.java @@ -4,6 +4,7 @@ import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.transporter.Transporter; import java.io.Closeable; +import java.io.IOException; import java.util.List; /** @@ -12,7 +13,7 @@ import java.util.List; * @author MuBao * @since 2022/12/31 */ -public interface CSInitializer extends Closeable { +public interface CSInitializer { /** * 类型名称,比如 akka, netty4,httpJson @@ -37,4 +38,6 @@ public interface CSInitializer extends Closeable { * @param actorInfos actor infos */ void bindHandlers(List actorInfos); + + void close() throws IOException; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java index 6c22b99e..a9b82961 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineConfig.java @@ -1,16 +1,12 @@ package tech.powerjob.remote.framework.engine; import lombok.Data; -import lombok.Getter; -import lombok.Setter; import lombok.experimental.Accessors; -import tech.powerjob.remote.framework.actor.PowerJobActor; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.ServerType; import java.io.Serializable; import java.util.List; -import java.util.Set; /** * EngineConfig @@ -22,11 +18,14 @@ import java.util.Set; @Accessors(chain = true) public class EngineConfig implements Serializable { + /** + * 服务类型 + */ private ServerType serverType; /** * 需要启动的引擎类型 */ - private Set types; + private String type; /** * 绑定的本地地址 */ @@ -34,5 +33,5 @@ public class EngineConfig implements Serializable { /** * actor实例,交由使用侧自己实例化以便自行注入各种 bean */ - private List actorList; + private List actorList; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineOutput.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineOutput.java index 3d7ee5c0..2bb2b999 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineOutput.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/EngineOutput.java @@ -2,6 +2,7 @@ package tech.powerjob.remote.framework.engine; import com.google.common.collect.Maps; import lombok.Getter; +import lombok.Setter; import tech.powerjob.remote.framework.transporter.Transporter; import java.util.Map; @@ -13,6 +14,7 @@ import java.util.Map; * @since 2022/12/31 */ @Getter +@Setter public class EngineOutput { - private Map type2Transport = Maps.newHashMap(); + private Transporter transporter; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java index f1628118..21f6e430 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/ActorFactory.java @@ -24,12 +24,12 @@ import java.util.Set; @Slf4j class ActorFactory { - static List load(List actorList) { + static List load(List actorList) { List actorInfos = Lists.newArrayList(); actorList.forEach(actor -> { - final Class clz = actor.getClass(); + final Class clz = actor.getClass(); try { final Actor anno = clz.getAnnotation(Actor.class); diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java index bf641240..eb1452d0 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java @@ -1,7 +1,5 @@ package tech.powerjob.remote.framework.engine.impl; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.reflections.Reflections; @@ -9,9 +7,7 @@ import tech.powerjob.common.OmsConstant; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.remote.framework.cs.CSInitializer; -import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * build CSInitializer @@ -22,39 +18,27 @@ import java.util.stream.Collectors; @Slf4j class CSInitializerFactory { - static List build(Set types) { + static CSInitializer build(String targetType) { Reflections reflections = new Reflections(OmsConstant.PACKAGE); Set> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class); log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet); - List ret = Lists.newArrayList(); - - cSInitializerClzSet.forEach(clz -> { + for (Class clz : cSInitializerClzSet) { try { CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance(); String type = csInitializer.type(); log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer); - if (types.contains(type)) { - ret.add(csInitializer); + if (targetType.equalsIgnoreCase(type)) { + return csInitializer; } } catch (Exception e) { log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz); ExceptionUtils.rethrow(e); } - }); - - Set loadTypes = ret.stream().map(CSInitializer::type).collect(Collectors.toSet()); - log.info("[CSInitializerFactory] final load types: {}", loadTypes); - - if (types.size() == ret.size()) { - return ret; } - Set remainTypes = Sets.newHashSet(types); - remainTypes.removeAll(loadTypes); - - throw new PowerJobException(String.format("can't load these CSInitializer[%s], ensure your package name start with 'tech.powerjob'!", remainTypes)); + throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob'!", targetType)); } } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index 5d69ee98..8fdd9c28 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -28,26 +28,26 @@ public class PowerJobRemoteEngine implements RemoteEngine { log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig); List actorInfos = ActorFactory.load(engineConfig.getActorList()); - List csInitializerList = CSInitializerFactory.build(engineConfig.getTypes()); + CSInitializer csInitializer = CSInitializerFactory.build(engineConfig.getType()); - csInitializerList.forEach(csInitializer -> { + String type = csInitializer.type(); - String type = csInitializer.type(); + Stopwatch sw = Stopwatch.createStarted(); + log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type); - Stopwatch sw = Stopwatch.createStarted(); - log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type); + csInitializer.init(new CSInitializerConfig() + .setBindAddress(engineConfig.getBindAddress()) + .setServerType(engineConfig.getServerType()) + ); - csInitializer.init(new CSInitializerConfig() - .setBindAddress(engineConfig.getBindAddress()) - .setServerType(engineConfig.getServerType()) - ); - Transporter transporter = csInitializer.buildTransporter(); - engineOutput.getType2Transport().put(type, transporter); + // 构建通讯器 + Transporter transporter = csInitializer.buildTransporter(); + engineOutput.setTransporter(transporter); - csInitializer.bindHandlers(actorInfos); + // 绑定 handler + csInitializer.bindHandlers(actorInfos); - log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw); - }); + log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw); return engineOutput; } diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java index 983d0893..eaf8c79d 100644 --- a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/RemoteEngineTest.java @@ -21,7 +21,7 @@ class RemoteEngineTest { RemoteEngine remoteEngine = new PowerJobRemoteEngine(); EngineConfig engineConfig = new EngineConfig(); - engineConfig.setTypes(Sets.newHashSet("TEST")); + engineConfig.setType("TEST"); engineConfig.setBindAddress(new Address().setHost("127.0.0.1").setPort(10086)); remoteEngine.start(engineConfig); } diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactoryTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactoryTest.java index d970192e..657a5434 100644 --- a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactoryTest.java +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/engine/impl/CSInitializerFactoryTest.java @@ -17,13 +17,13 @@ class CSInitializerFactoryTest { @Test void testBuildNormal() { - CSInitializerFactory.build(Sets.newHashSet("TEST")); + CSInitializerFactory.build("TEST"); } @Test void testNotFind() { Assertions.assertThrows(PowerJobException.class, () -> { - CSInitializerFactory.build(Sets.newHashSet("omicron")); + CSInitializerFactory.build("omicron"); }); } } \ No newline at end of file diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestActor.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestActor.java index b2d82ec0..a1d3351a 100644 --- a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestActor.java +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/test/TestActor.java @@ -3,7 +3,6 @@ package tech.powerjob.remote.framework.test; import lombok.extern.slf4j.Slf4j; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; -import tech.powerjob.remote.framework.actor.PowerJobActor; import java.util.Map; @@ -15,7 +14,7 @@ import java.util.Map; */ @Slf4j @Actor(path = "/test") -public class TestActor implements PowerJobActor { +public class TestActor { public static void simpleStaticMethod() { } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java index 6f31dc82..9a25309d 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java @@ -1,5 +1,6 @@ package tech.powerjob.remote.http; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -37,13 +38,14 @@ class HttpVertxCSInitializerTest { final Address address = new Address().setPort(7890).setHost("127.0.0.1"); EngineConfig engineConfig = new EngineConfig() - .setTypes(Sets.newHashSet(Protocol.HTTP.name())) - .setBindAddress(address); + .setType(Protocol.HTTP.name()) + .setBindAddress(address) + .setActorList(Lists.newArrayList(new BenchmarkActor())); RemoteEngine engine = new PowerJobRemoteEngine(); EngineOutput engineOutput = engine.start(engineConfig); log.info("[HttpVertxCSInitializerTest] engine start up successfully!"); - final Transporter transporter = engineOutput.getType2Transport().get(Protocol.HTTP.name()); + Transporter transporter = engineOutput.getTransporter(); URL url = new URL() .setAddress(address)