mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: finished remote engine
This commit is contained in:
parent
84ef2fd120
commit
4356c5566d
@ -8,6 +8,11 @@ package tech.powerjob.common;
|
||||
*/
|
||||
public class OmsConstant {
|
||||
|
||||
/**
|
||||
* package name
|
||||
*/
|
||||
public static final String PACKAGE = "tech.powerjob";
|
||||
|
||||
public static final int SERVER_DEFAULT_AKKA_PORT = 10086;
|
||||
public static final int SERVER_DEFAULT_HTTP_PORT = 10010;
|
||||
|
||||
|
@ -17,6 +17,10 @@
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
|
||||
<powerjob-common.version>4.2.0</powerjob-common.version>
|
||||
<reflections.version>0.10.2</reflections.version>
|
||||
|
||||
<junit.version>5.9.0</junit.version>
|
||||
<logback.version>1.2.9</logback.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -25,6 +29,27 @@
|
||||
<artifactId>powerjob-common</artifactId>
|
||||
<version>${powerjob-common.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.reflections</groupId>
|
||||
<artifactId>reflections</artifactId>
|
||||
<version>${reflections.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Junit tests -->
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- log for test stage -->
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>${logback.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,28 @@
|
||||
package tech.powerjob.remote.framework.actor;
|
||||
|
||||
|
||||
/**
|
||||
* ActorInfo
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
public class ActorInfo {
|
||||
|
||||
private final Object actor;
|
||||
|
||||
private final Actor anno;
|
||||
|
||||
public ActorInfo(Object actor, Actor anno) {
|
||||
this.actor = actor;
|
||||
this.anno = anno;
|
||||
}
|
||||
|
||||
public Object getActor() {
|
||||
return actor;
|
||||
}
|
||||
|
||||
public Actor getAnno() {
|
||||
return anno;
|
||||
}
|
||||
}
|
@ -1,5 +1,9 @@
|
||||
package tech.powerjob.remote.framework.actor;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
import tech.powerjob.remote.framework.base.HandlerLocation;
|
||||
|
||||
import java.io.Serializable;
|
||||
@ -11,7 +15,11 @@ import java.lang.reflect.Method;
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
public class HandlerInfo implements Serializable {
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class HandlerInfo {
|
||||
|
||||
private HandlerLocation location;
|
||||
/**
|
||||
@ -21,5 +29,5 @@ public class HandlerInfo implements Serializable {
|
||||
/**
|
||||
* actor 对象
|
||||
*/
|
||||
private Object actor;
|
||||
private transient ActorInfo actorInfo;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package tech.powerjob.remote.framework.base;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.io.Serializable;
|
||||
@ -14,6 +15,7 @@ import java.io.Serializable;
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class HandlerLocation implements Serializable {
|
||||
/**
|
||||
|
@ -0,0 +1,30 @@
|
||||
package tech.powerjob.remote.framework.engine;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import tech.powerjob.remote.framework.base.Address;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* EngineConfig
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public class EngineConfig implements Serializable {
|
||||
|
||||
/**
|
||||
* 需要启动的引擎类型
|
||||
*/
|
||||
private Set<String> types;
|
||||
/**
|
||||
* 绑定的本地地址
|
||||
*/
|
||||
private Address bindAddress;
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package tech.powerjob.remote.framework.engine;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Getter;
|
||||
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 引擎输出
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Getter
|
||||
public class EngineOutput {
|
||||
private Map<String, Transporter> type2Transport = Maps.newHashMap();
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package tech.powerjob.remote.framework.engine;
|
||||
|
||||
/**
|
||||
* RemoteEngine
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
public interface RemoteEngine {
|
||||
|
||||
EngineOutput start(EngineConfig engineConfig);
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package tech.powerjob.remote.framework.engine.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.reflections.Reflections;
|
||||
import tech.powerjob.common.OmsConstant;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.remote.framework.cs.CSInitializer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* build CSInitializer
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Slf4j
|
||||
class CSInitializerFactory {
|
||||
|
||||
static List<CSInitializer> build(Set<String> types) {
|
||||
|
||||
Reflections reflections = new Reflections(OmsConstant.PACKAGE);
|
||||
Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);
|
||||
|
||||
log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);
|
||||
|
||||
List<CSInitializer> ret = Lists.newArrayList();
|
||||
|
||||
cSInitializerClzSet.forEach(clz -> {
|
||||
try {
|
||||
CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
|
||||
String type = csInitializer.type();
|
||||
log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
|
||||
if (types.contains(type)) {
|
||||
ret.add(csInitializer);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
|
||||
ExceptionUtils.rethrow(e);
|
||||
}
|
||||
});
|
||||
|
||||
Set<String> loadTypes = ret.stream().map(CSInitializer::type).collect(Collectors.toSet());
|
||||
log.info("[CSInitializerFactory] final load types: {}", loadTypes);
|
||||
|
||||
if (types.size() == ret.size()) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
Set<String> remainTypes = Sets.newHashSet(types);
|
||||
remainTypes.removeAll(loadTypes);
|
||||
|
||||
throw new PowerJobException(String.format("can't load these CSInitializer[%s], ensure your package name start with 'tech.powerjob'!", remainTypes));
|
||||
}
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package tech.powerjob.remote.framework.engine.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.reflections.ReflectionUtils;
|
||||
import org.reflections.Reflections;
|
||||
import tech.powerjob.common.OmsConstant;
|
||||
import tech.powerjob.remote.framework.actor.Actor;
|
||||
import tech.powerjob.remote.framework.actor.ActorInfo;
|
||||
import tech.powerjob.remote.framework.actor.Handler;
|
||||
import tech.powerjob.remote.framework.actor.HandlerInfo;
|
||||
import tech.powerjob.remote.framework.base.HandlerLocation;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* load all Actor
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Slf4j
|
||||
class HandlerFactory {
|
||||
|
||||
public static List<HandlerInfo> load() {
|
||||
List<ActorInfo> actorInfos = loadActorInfos();
|
||||
List<HandlerInfo> ret = Lists.newArrayList();
|
||||
actorInfos.forEach(actorInfo -> {
|
||||
Actor anno = actorInfo.getAnno();
|
||||
String rootPath = anno.path();
|
||||
Object actor = actorInfo.getActor();
|
||||
Set<Method> allHandlerMethods = ReflectionUtils.getAllMethods(actor.getClass(), (input -> input != null && input.isAnnotationPresent(Handler.class)));
|
||||
allHandlerMethods.forEach(handlerMethod -> {
|
||||
Handler handlerMethodAnnotation = handlerMethod.getAnnotation(Handler.class);
|
||||
|
||||
HandlerLocation handlerLocation = new HandlerLocation()
|
||||
.setRootPath(rootPath)
|
||||
.setMethodPath(handlerMethodAnnotation.path());
|
||||
|
||||
|
||||
HandlerInfo handlerInfo = new HandlerInfo()
|
||||
.setActorInfo(actorInfo)
|
||||
.setMethod(handlerMethod)
|
||||
.setLocation(handlerLocation);
|
||||
ret.add(handlerInfo);
|
||||
});
|
||||
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
static List<ActorInfo> loadActorInfos() {
|
||||
Reflections reflections = new Reflections(OmsConstant.PACKAGE);
|
||||
final Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(Actor.class);
|
||||
|
||||
List<ActorInfo> actorInfos = Lists.newArrayList();
|
||||
typesAnnotatedWith.forEach(clz -> {
|
||||
try {
|
||||
final Actor anno = clz.getAnnotation(Actor.class);
|
||||
final Object object = clz.getDeclaredConstructor().newInstance();
|
||||
|
||||
log.info("[ActorFactory] load Actor[clz={},path={}] successfully!", clz, anno.path());
|
||||
|
||||
actorInfos.add(new ActorInfo(object, anno));
|
||||
} catch (Throwable t) {
|
||||
log.error("[ActorFactory] process Actor[{}] failed!", clz);
|
||||
ExceptionUtils.rethrow(t);
|
||||
}
|
||||
});
|
||||
|
||||
return actorInfos;
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
package tech.powerjob.remote.framework.engine.impl;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.remote.framework.actor.HandlerInfo;
|
||||
import tech.powerjob.remote.framework.cs.CSInitializer;
|
||||
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
|
||||
import tech.powerjob.remote.framework.engine.EngineConfig;
|
||||
import tech.powerjob.remote.framework.engine.EngineOutput;
|
||||
import tech.powerjob.remote.framework.engine.RemoteEngine;
|
||||
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 初始化 PowerJob 整个网络层
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Slf4j
|
||||
public class PowerJobRemoteEngine implements RemoteEngine {
|
||||
|
||||
@Override
|
||||
public EngineOutput start(EngineConfig engineConfig) {
|
||||
|
||||
EngineOutput engineOutput = new EngineOutput();
|
||||
log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig);
|
||||
|
||||
List<HandlerInfo> handlerInfos = HandlerFactory.load();
|
||||
List<CSInitializer> csInitializerList = CSInitializerFactory.build(engineConfig.getTypes());
|
||||
|
||||
csInitializerList.forEach(csInitializer -> {
|
||||
|
||||
String type = csInitializer.type();
|
||||
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
log.info("[PowerJobRemoteEngine] try to startup CSInitializer[type={}]", type);
|
||||
|
||||
csInitializer.init(new CSInitializerConfig().setBindAddress(engineConfig.getBindAddress()));
|
||||
Transporter transporter = csInitializer.buildTransporter();
|
||||
engineOutput.getType2Transport().put(type, transporter);
|
||||
|
||||
csInitializer.bindHandlers(handlerInfos);
|
||||
|
||||
log.info("[PowerJobRemoteEngine] startup CSInitializer[type={}] successfully, cost: {}", type, sw);
|
||||
});
|
||||
|
||||
return engineOutput;
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package tech.powerjob.remote.framework.engine;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import tech.powerjob.remote.framework.base.Address;
|
||||
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* RemoteEngineTest
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
class RemoteEngineTest {
|
||||
|
||||
@Test
|
||||
void start() {
|
||||
|
||||
RemoteEngine remoteEngine = new PowerJobRemoteEngine();
|
||||
|
||||
EngineConfig engineConfig = new EngineConfig();
|
||||
engineConfig.setTypes(Sets.newHashSet("TEST"));
|
||||
engineConfig.setBindAddress(new Address().setHost("127.0.0.1").setPort(10086));
|
||||
remoteEngine.start(engineConfig);
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package tech.powerjob.remote.framework.engine.impl;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* CSInitializerFactoryTest
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
class CSInitializerFactoryTest {
|
||||
|
||||
@Test
|
||||
void testBuildNormal() {
|
||||
CSInitializerFactory.build(Sets.newHashSet("TEST"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNotFind() {
|
||||
Assertions.assertThrows(PowerJobException.class, () -> {
|
||||
CSInitializerFactory.build(Sets.newHashSet("omicron"));
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package tech.powerjob.remote.framework.engine.impl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import tech.powerjob.remote.framework.actor.ActorInfo;
|
||||
import tech.powerjob.remote.framework.actor.HandlerInfo;
|
||||
import tech.powerjob.remote.framework.test.TestActor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* HandlerFactoryTest
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Slf4j
|
||||
class HandlerFactoryTest {
|
||||
|
||||
@Test
|
||||
void load() {
|
||||
final List<HandlerInfo> handlerInfos = HandlerFactory.load();
|
||||
log.info("[HandlerFactoryTest] handlerInfos: {}", handlerInfos);
|
||||
}
|
||||
|
||||
@Test
|
||||
void loadActorInfos() {
|
||||
final List<ActorInfo> actorInfos = HandlerFactory.loadActorInfos();
|
||||
final Set<String> clzNames = actorInfos.stream().map(x -> x.getActor().getClass().getName()).collect(Collectors.toSet());
|
||||
log.info("[HandlerFactoryTest] all load clzNames: {}", clzNames);
|
||||
|
||||
assert clzNames.contains(TestActor.class.getName());
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package tech.powerjob.remote.framework.test;
|
||||
|
||||
import tech.powerjob.remote.framework.actor.Actor;
|
||||
import tech.powerjob.remote.framework.actor.Handler;
|
||||
|
||||
/**
|
||||
* TestActor
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Actor(path = "/test")
|
||||
public class TestActor {
|
||||
|
||||
public static void simpleStaticMethod() {
|
||||
}
|
||||
|
||||
public void simpleMethod() {
|
||||
}
|
||||
|
||||
@Handler(path = "/method1")
|
||||
public String handlerMethod1() {
|
||||
return "1";
|
||||
}
|
||||
|
||||
@Handler(path = "/method2")
|
||||
public String handlerMethod2(String name) {
|
||||
return name;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
package tech.powerjob.remote.framework.test;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.remote.framework.actor.HandlerInfo;
|
||||
import tech.powerjob.remote.framework.cs.CSInitializer;
|
||||
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
|
||||
import tech.powerjob.remote.framework.transporter.Transporter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* TestCSInitializer
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/12/31
|
||||
*/
|
||||
@Slf4j
|
||||
public class TestCSInitializer implements CSInitializer {
|
||||
@Override
|
||||
public String type() {
|
||||
return "TEST";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(CSInitializerConfig config) {
|
||||
log.info("TestCSInitializer#init");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transporter buildTransporter() {
|
||||
log.info("TestCSInitializer#buildTransporter");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bindHandlers(List<HandlerInfo> handlerInfos) {
|
||||
log.info("TestCSInitializer#bindHandlers");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
log.info("TestCSInitializer#close");
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user