diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java index 049949f2..325aa6e2 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/BenchmarkActor.java @@ -9,6 +9,8 @@ import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; +import java.util.Optional; + /** * 基准测试 * @@ -20,9 +22,9 @@ import tech.powerjob.remote.framework.actor.Handler; public class BenchmarkActor { @Handler(path = "standard") - public BenchmarkResponse processStandardRequest(BenchmarkRequest request) { + public BenchmarkResponse standardRequest(BenchmarkRequest request) { long startTs = System.currentTimeMillis(); - log.info("[BenchmarkActor] receive request: {}", request); + log.info("[BenchmarkActor] [standardRequest] receive request: {}", request); BenchmarkResponse response = new BenchmarkResponse() .setSuccess(true) .setContent(request.getContent()) @@ -31,11 +33,28 @@ public class BenchmarkActor { if (request.getResponseSize() != null && request.getResponseSize() > 0) { response.setExtra(RandomStringUtils.randomPrint(request.getResponseSize())); } + executeSleep(request); + response.setServerCost(System.currentTimeMillis() - startTs); + return response; + } + + @Handler(path = "emptyReturn") + public void emptyReturn(BenchmarkRequest request) { + log.info("[BenchmarkActor] [emptyReturn] receive request: {}", request); + executeSleep(request); + } + + @Handler(path = "stringReturn") + public String stringReturn(BenchmarkRequest request) { + log.info("[BenchmarkActor] [stringReturn] receive request: {}", request); + executeSleep(request); + return RandomStringUtils.randomPrint(Optional.ofNullable(request.getResponseSize()).orElse(100)); + } + + private static void executeSleep(BenchmarkRequest request) { if (request.getBlockingMills() != null && request.getBlockingMills() > 0) { CommonUtils.easySleep(request.getBlockingMills()); } - response.setServerCost(System.currentTimeMillis() - startTs); - return response; } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java index b291f39c..e4b545fc 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java @@ -2,14 +2,9 @@ package tech.powerjob.remote.http.vertx; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Future; -import io.vertx.core.Vertx; import io.vertx.core.http.*; import io.vertx.core.json.JsonObject; -import lombok.SneakyThrows; import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.request.ServerScheduleJobReq; -import tech.powerjob.remote.framework.base.Address; -import tech.powerjob.remote.framework.base.HandlerLocation; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.transporter.Protocol; @@ -49,6 +44,7 @@ public class VertxTransporter implements Transporter { return post(url, request, clz); } + @SuppressWarnings("unchecked") private CompletionStage post(URL url, PowerSerializable request, Class clz) { final String host = url.getAddress().getHost(); final int port = url.getAddress().getPort(); @@ -72,9 +68,18 @@ public class VertxTransporter implements Transporter { )); } - // TODO: 验证无响应的情况 + return httpClientResponse.body().compose(x -> { - return httpClientResponse.body().compose(x -> Future.succeededFuture(x.toJsonObject().mapTo(clz))); + if (clz == null) { + return Future.succeededFuture(null); + } + + if (clz.equals(String.class)) { + return Future.succeededFuture((T) x.toString()); + } + + return Future.succeededFuture(x.toJsonObject().mapTo(clz)); + }); }).toCompletionStage(); } } diff --git a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java index 9a25309d..a009293b 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/test/java/tech/powerjob/remote/http/HttpVertxCSInitializerTest.java @@ -1,7 +1,6 @@ package tech.powerjob.remote.http; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import tech.powerjob.common.enums.Protocol; @@ -10,8 +9,6 @@ import tech.powerjob.remote.framework.BenchmarkActor; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.HandlerLocation; import tech.powerjob.remote.framework.base.URL; -import tech.powerjob.remote.framework.cs.CSInitializer; -import tech.powerjob.remote.framework.cs.CSInitializerConfig; import tech.powerjob.remote.framework.engine.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; @@ -21,8 +18,6 @@ import tech.powerjob.remote.framework.transporter.Transporter; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.*; - /** * HttpVertxCSInitializerTest * @@ -47,19 +42,36 @@ class HttpVertxCSInitializerTest { log.info("[HttpVertxCSInitializerTest] engine start up successfully!"); Transporter transporter = engineOutput.getTransporter(); - URL url = new URL() - .setAddress(address) - .setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark")); - BenchmarkActor.BenchmarkRequest request = new BenchmarkActor.BenchmarkRequest() .setContent("request from test") .setBlockingMills(100) .setResponseSize(10240); + log.info("[HttpVertxCSInitializerTest] test empty request!"); + URL emptyURL = new URL() + .setAddress(address) + .setLocation(new HandlerLocation().setMethodPath("emptyReturn").setRootPath("benchmark")); + transporter.tell(emptyURL, request); + + log.info("[HttpVertxCSInitializerTest] test string request!"); + URL stringURL = new URL() + .setAddress(address) + .setLocation(new HandlerLocation().setMethodPath("stringReturn").setRootPath("benchmark")); + final String strResponse = transporter.ask(stringURL, request, String.class).toCompletableFuture().get(); + log.info("[HttpVertxCSInitializerTest] strResponse: {}", strResponse); + + log.info("[HttpVertxCSInitializerTest] test normal request!"); + URL url = new URL() + .setAddress(address) + .setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark")); + final CompletionStage benchmarkResponseCompletionStage = transporter.ask(url, request, BenchmarkActor.BenchmarkResponse.class); final BenchmarkActor.BenchmarkResponse response = benchmarkResponseCompletionStage.toCompletableFuture().get(10, TimeUnit.SECONDS); log.info("[HttpVertxCSInitializerTest] response: {}", response); - CommonUtils.easySleep(1000000000); + + + + CommonUtils.easySleep(10000); } } \ No newline at end of file