feat: finished config

This commit is contained in:
tjq 2023-01-02 00:11:31 +08:00
parent 2c31e81c5f
commit d3bd22302f
10 changed files with 254 additions and 107 deletions

View File

@ -18,4 +18,10 @@ public @interface Handler {
* @return handler path
*/
String path();
/**
* 处理类型
* @return 阻塞 or 非阻塞
*/
ProcessType processType() default ProcessType.BLOCKING;
}

View File

@ -26,8 +26,9 @@ public class HandlerInfo {
* handler 对应的方法
*/
private Method method;
/**
* actor 对象
* Handler 注解携带的信息
*/
private transient ActorInfo actorInfo;
private Handler anno;
}

View File

@ -0,0 +1,20 @@
package tech.powerjob.remote.framework.actor;
/**
* 处理器类型
*
* @author tjq
* @since 2023/1/1
*/
public enum ProcessType {
/**
* 阻塞式
*/
BLOCKING,
/**
* 非阻塞式
*/
NO_BLOCKING
}

View File

@ -15,7 +15,6 @@ import java.io.Serializable;
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class HandlerLocation implements Serializable {
/**
@ -30,4 +29,9 @@ public class HandlerLocation implements Serializable {
public String toPath() {
return String.format("/%s/%s", rootPath, methodPath);
}
@Override
public String toString() {
return toPath();
}
}

View File

@ -66,7 +66,7 @@ class ActorFactory {
HandlerInfo handlerInfo = new HandlerInfo()
.setActorInfo(actorInfo)
.setAnno(handlerMethodAnnotation)
.setMethod(handlerMethod)
.setLocation(handlerLocation);
ret.add(handlerInfo);

View File

@ -0,0 +1,35 @@
package tech.powerjob.remote.framework.utils;
import org.apache.commons.lang3.ArrayUtils;
import tech.powerjob.common.PowerSerializable;
import java.util.Optional;
/**
* RemoteUtils
*
* @author tjq
* @since 2023/1/1
*/
public class RemoteUtils {
public static Optional<Class<?>> findPowerSerialize(Class<?>[] parameterTypes) {
if (ArrayUtils.isEmpty(parameterTypes)) {
return Optional.empty();
}
for (Class<?> clz : parameterTypes) {
final Class<?>[] interfaces = clz.getInterfaces();
if (ArrayUtils.isEmpty(interfaces)) {
continue;
}
if (PowerSerializable.class.isAssignableFrom(clz)) {
return Optional.of(clz);
}
}
return Optional.empty();
}
}

View File

@ -0,0 +1,35 @@
package tech.powerjob.remote.framework.utils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.request.ServerScheduleJobReq;
import java.util.Optional;
/**
* RemoteUtilsTest
*
* @author tjq
* @since 2023/1/1
*/
@Slf4j
class RemoteUtilsTest {
@Test
void findPowerSerialize() {
Class<?>[] contains = {AlarmConfig.class, ServerScheduleJobReq.class};
Class<?>[] notContains = {AlarmConfig.class};
final Optional<Class<?>> notContainsResult = RemoteUtils.findPowerSerialize(notContains);
log.info("[RemoteUtilsTest] notContainsResult: {}", notContainsResult);
final Optional<Class<?>> containsResult = RemoteUtils.findPowerSerialize(contains);
log.info("[RemoteUtilsTest] containsResult: {}", containsResult);
assert !notContainsResult.isPresent();
assert containsResult.isPresent();
}
}

View File

@ -1,57 +0,0 @@
package tech.powerjob.remote.http;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.http.vertx.VertxInitializer;
import tech.powerjob.remote.http.vertx.VertxTransporter;
import java.io.IOException;
import java.util.List;
/**
* HttpCSInitializer
*
* @author tjq
* @since 2022/12/31
*/
public class HttpCSInitializer implements CSInitializer {
private Vertx vertx;
private HttpServer httpServer;
private HttpClient httpClient;
@Override
public String type() {
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
@Override
public void init(CSInitializerConfig config) {
vertx = VertxInitializer.buildVertx();
httpServer = VertxInitializer.buildHttpServer(vertx);
httpClient = VertxInitializer.buildHttpClient(vertx);
}
@Override
public Transporter buildTransporter() {
return new VertxTransporter(httpClient);
}
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
}
@Override
public void close() throws IOException {
vertx.close();
httpServer.close();
httpClient.close();
}
}

View File

@ -0,0 +1,149 @@
package tech.powerjob.remote.http;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import tech.powerjob.remote.http.vertx.VertxInitializer;
import tech.powerjob.remote.http.vertx.VertxTransporter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* HttpCSInitializer
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {
private Vertx vertx;
private HttpServer httpServer;
private HttpClient httpClient;
private CSInitializerConfig config;
@Override
public String type() {
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
vertx = VertxInitializer.buildVertx();
httpServer = VertxInitializer.buildHttpServer(vertx);
httpClient = VertxInitializer.buildHttpClient(vertx);
}
@Override
public Transporter buildTransporter() {
return new VertxTransporter(httpClient);
}
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
Router router = Router.router(vertx);
// 处理请求响应
router.route().handler(BodyHandler.create());
actorInfos.forEach(actorInfo -> {
log.info("[HttpVertxCSInitializer] start to bind Actor[{}]'s handler!", actorInfo.getAnno().path());
Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
Method method = handlerInfo.getMethod();
String handlerHttpPath = handlerInfo.getLocation().toPath();
ProcessType processType = handlerInfo.getAnno().processType();
log.info("[HttpVertxCSInitializer] register Handler with[path={},methodName={},processType={}]", handlerHttpPath, method.getName(), processType);
Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
Route route = router.post(handlerHttpPath);
if (processType == ProcessType.BLOCKING) {
route.blockingHandler(routingContextHandler, false);
} else {
route.handler(routingContextHandler);
}
});
});
// 启动 vertx http server
final int port = config.getBindAddress().getPort();
final String host = config.getBindAddress().getHost();
httpServer.requestHandler(router)
.exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
.listen(port, host, asyncResult -> {
if (asyncResult.succeeded()) {
log.info("[PowerJob] startup vertx HttpServer successfully!");
} else {
log.error("[PowerJob] startup vertx HttpServer failed!", asyncResult.cause());
throw new PowerJobException("startup vertx HttpServer failed", asyncResult.cause());
}
});
}
private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
return ctx -> {
final RequestBody body = ctx.body();
final Object convertResult = convertResult(body, handlerInfo);
try {
Object response = handlerInfo.getMethod().invoke(actorInfo.getActor(), convertResult);
if (response != null) {
if (response instanceof String) {
ctx.end((String) response);
} else {
ctx.end(JsonObject.mapFrom(response).toBuffer());
}
}
ctx.end();
} catch (Throwable t) {
// 注意这里是框架实际运行时日志输出用标准 PowerJob 格式
log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
}
};
}
private static Object convertResult(RequestBody body, HandlerInfo handlerInfo) {
final Method method = handlerInfo.getMethod();
Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
// 内部框架严格模式绑定失败直接报错
if (!powerSerializeClz.isPresent()) {
throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
}
return body.asPojo(powerSerializeClz.get());
}
@Override
public void close() throws IOException {
vertx.close();
httpServer.close();
httpClient.close();
}
}

View File

@ -1,46 +0,0 @@
package tech.powerjob.remote.http.vertx;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.Map;
/**
* description
*
* @author tjq
* @since 2023/1/1
*/
public class Test {
public static void main(String[] args) {
final Vertx vertx = Vertx.vertx();
final HttpServer httpServer = vertx.createHttpServer();
final Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post("/test/abc").handler(ctx -> {
final Map<String, Object> data = ctx.data();
System.out.println("ctx.data: " + data);
final String body = ctx.body().asString();
System.out.println("request: " + body);
JsonObject jsonObject = new JsonObject();
jsonObject.put("aa", "vv");
// ctx.end(jsonObject.toBuffer());
ctx.fail(404);
ctx.end("failedFromServer");
});
httpServer
.requestHandler(router)
.exceptionHandler(e -> e.printStackTrace())
.listen(7890);
System.out.println("aa");
}
}