feat: optimize remote framework

This commit is contained in:
tjq 2023-01-07 14:14:32 +08:00
parent 5d3bfedf5d
commit b0b2c24571
12 changed files with 43 additions and 64 deletions

View File

@ -18,7 +18,7 @@ import java.util.List;
@Accessors(chain = true) @Accessors(chain = true)
public class ActorInfo { public class ActorInfo {
private PowerJobActor actor; private Object actor;
private Actor anno; private Actor anno;

View File

@ -1,10 +0,0 @@
package tech.powerjob.remote.framework.actor;
/**
* 方便后续扩展比如启动回调等
*
* @author tjq
* @since 2023/1/6
*/
public interface PowerJobActor {
}

View File

@ -4,6 +4,7 @@ import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
@ -12,7 +13,7 @@ import java.util.List;
* @author MuBao * @author MuBao
* @since 2022/12/31 * @since 2022/12/31
*/ */
public interface CSInitializer extends Closeable { public interface CSInitializer {
/** /**
* 类型名称比如 akka, netty4httpJson * 类型名称比如 akka, netty4httpJson
@ -37,4 +38,6 @@ public interface CSInitializer extends Closeable {
* @param actorInfos actor infos * @param actorInfos actor infos
*/ */
void bindHandlers(List<ActorInfo> actorInfos); void bindHandlers(List<ActorInfo> actorInfos);
void close() throws IOException;
} }

View File

@ -1,16 +1,12 @@
package tech.powerjob.remote.framework.engine; package tech.powerjob.remote.framework.engine;
import lombok.Data; import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.actor.PowerJobActor;
import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* EngineConfig * EngineConfig
@ -22,11 +18,14 @@ import java.util.Set;
@Accessors(chain = true) @Accessors(chain = true)
public class EngineConfig implements Serializable { public class EngineConfig implements Serializable {
/**
* 服务类型
*/
private ServerType serverType; private ServerType serverType;
/** /**
* 需要启动的引擎类型 * 需要启动的引擎类型
*/ */
private Set<String> types; private String type;
/** /**
* 绑定的本地地址 * 绑定的本地地址
*/ */
@ -34,5 +33,5 @@ public class EngineConfig implements Serializable {
/** /**
* actor实例交由使用侧自己实例化以便自行注入各种 bean * actor实例交由使用侧自己实例化以便自行注入各种 bean
*/ */
private List<PowerJobActor> actorList; private List<Object> actorList;
} }

View File

@ -2,6 +2,7 @@ package tech.powerjob.remote.framework.engine;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.Getter; import lombok.Getter;
import lombok.Setter;
import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.remote.framework.transporter.Transporter;
import java.util.Map; import java.util.Map;
@ -13,6 +14,7 @@ import java.util.Map;
* @since 2022/12/31 * @since 2022/12/31
*/ */
@Getter @Getter
@Setter
public class EngineOutput { public class EngineOutput {
private Map<String, Transporter> type2Transport = Maps.newHashMap(); private Transporter transporter;
} }

View File

@ -24,12 +24,12 @@ import java.util.Set;
@Slf4j @Slf4j
class ActorFactory { class ActorFactory {
static List<ActorInfo> load(List<PowerJobActor> actorList) { static List<ActorInfo> load(List<Object> actorList) {
List<ActorInfo> actorInfos = Lists.newArrayList(); List<ActorInfo> actorInfos = Lists.newArrayList();
actorList.forEach(actor -> { actorList.forEach(actor -> {
final Class<? extends PowerJobActor> clz = actor.getClass(); final Class<?> clz = actor.getClass();
try { try {
final Actor anno = clz.getAnnotation(Actor.class); final Actor anno = clz.getAnnotation(Actor.class);

View File

@ -1,7 +1,5 @@
package tech.powerjob.remote.framework.engine.impl; package tech.powerjob.remote.framework.engine.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.reflections.Reflections; import org.reflections.Reflections;
@ -9,9 +7,7 @@ import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.cs.CSInitializer; import tech.powerjob.remote.framework.cs.CSInitializer;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
/** /**
* build CSInitializer * build CSInitializer
@ -22,39 +18,27 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
class CSInitializerFactory { class CSInitializerFactory {
static List<CSInitializer> build(Set<String> types) { static CSInitializer build(String targetType) {
Reflections reflections = new Reflections(OmsConstant.PACKAGE); Reflections reflections = new Reflections(OmsConstant.PACKAGE);
Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class); Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);
log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet); log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);
List<CSInitializer> ret = Lists.newArrayList(); for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {
cSInitializerClzSet.forEach(clz -> {
try { try {
CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance(); CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
String type = csInitializer.type(); String type = csInitializer.type();
log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer); log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
if (types.contains(type)) { if (targetType.equalsIgnoreCase(type)) {
ret.add(csInitializer); return csInitializer;
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz); log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
ExceptionUtils.rethrow(e); ExceptionUtils.rethrow(e);
} }
});
Set<String> loadTypes = ret.stream().map(CSInitializer::type).collect(Collectors.toSet());
log.info("[CSInitializerFactory] final load types: {}", loadTypes);
if (types.size() == ret.size()) {
return ret;
} }
Set<String> remainTypes = Sets.newHashSet(types); throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob'!", targetType));
remainTypes.removeAll(loadTypes);
throw new PowerJobException(String.format("can't load these CSInitializer[%s], ensure your package name start with 'tech.powerjob'!", remainTypes));
} }
} }

View File

@ -28,26 +28,26 @@ public class PowerJobRemoteEngine implements RemoteEngine {
log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig); log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig);
List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList()); List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
List<CSInitializer> csInitializerList = CSInitializerFactory.build(engineConfig.getTypes()); CSInitializer csInitializer = CSInitializerFactory.build(engineConfig.getType());
csInitializerList.forEach(csInitializer -> { String type = csInitializer.type();
String type = csInitializer.type(); Stopwatch sw = Stopwatch.createStarted();
log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type);
Stopwatch sw = Stopwatch.createStarted(); csInitializer.init(new CSInitializerConfig()
log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type); .setBindAddress(engineConfig.getBindAddress())
.setServerType(engineConfig.getServerType())
);
csInitializer.init(new CSInitializerConfig() // 构建通讯器
.setBindAddress(engineConfig.getBindAddress()) Transporter transporter = csInitializer.buildTransporter();
.setServerType(engineConfig.getServerType()) engineOutput.setTransporter(transporter);
);
Transporter transporter = csInitializer.buildTransporter();
engineOutput.getType2Transport().put(type, transporter);
csInitializer.bindHandlers(actorInfos); // 绑定 handler
csInitializer.bindHandlers(actorInfos);
log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw); log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw);
});
return engineOutput; return engineOutput;
} }

View File

@ -21,7 +21,7 @@ class RemoteEngineTest {
RemoteEngine remoteEngine = new PowerJobRemoteEngine(); RemoteEngine remoteEngine = new PowerJobRemoteEngine();
EngineConfig engineConfig = new EngineConfig(); EngineConfig engineConfig = new EngineConfig();
engineConfig.setTypes(Sets.newHashSet("TEST")); engineConfig.setType("TEST");
engineConfig.setBindAddress(new Address().setHost("127.0.0.1").setPort(10086)); engineConfig.setBindAddress(new Address().setHost("127.0.0.1").setPort(10086));
remoteEngine.start(engineConfig); remoteEngine.start(engineConfig);
} }

View File

@ -17,13 +17,13 @@ class CSInitializerFactoryTest {
@Test @Test
void testBuildNormal() { void testBuildNormal() {
CSInitializerFactory.build(Sets.newHashSet("TEST")); CSInitializerFactory.build("TEST");
} }
@Test @Test
void testNotFind() { void testNotFind() {
Assertions.assertThrows(PowerJobException.class, () -> { Assertions.assertThrows(PowerJobException.class, () -> {
CSInitializerFactory.build(Sets.newHashSet("omicron")); CSInitializerFactory.build("omicron");
}); });
} }
} }

View File

@ -3,7 +3,6 @@ package tech.powerjob.remote.framework.test;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.PowerJobActor;
import java.util.Map; import java.util.Map;
@ -15,7 +14,7 @@ import java.util.Map;
*/ */
@Slf4j @Slf4j
@Actor(path = "/test") @Actor(path = "/test")
public class TestActor implements PowerJobActor { public class TestActor {
public static void simpleStaticMethod() { public static void simpleStaticMethod() {
} }

View File

@ -1,5 +1,6 @@
package tech.powerjob.remote.http; package tech.powerjob.remote.http;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -37,13 +38,14 @@ class HttpVertxCSInitializerTest {
final Address address = new Address().setPort(7890).setHost("127.0.0.1"); final Address address = new Address().setPort(7890).setHost("127.0.0.1");
EngineConfig engineConfig = new EngineConfig() EngineConfig engineConfig = new EngineConfig()
.setTypes(Sets.newHashSet(Protocol.HTTP.name())) .setType(Protocol.HTTP.name())
.setBindAddress(address); .setBindAddress(address)
.setActorList(Lists.newArrayList(new BenchmarkActor()));
RemoteEngine engine = new PowerJobRemoteEngine(); RemoteEngine engine = new PowerJobRemoteEngine();
EngineOutput engineOutput = engine.start(engineConfig); EngineOutput engineOutput = engine.start(engineConfig);
log.info("[HttpVertxCSInitializerTest] engine start up successfully!"); log.info("[HttpVertxCSInitializerTest] engine start up successfully!");
final Transporter transporter = engineOutput.getType2Transport().get(Protocol.HTTP.name()); Transporter transporter = engineOutput.getTransporter();
URL url = new URL() URL url = new URL()
.setAddress(address) .setAddress(address)