feat: process empty return in vertx

This commit is contained in:
tjq 2023-01-24 12:46:35 +08:00
parent 55e259bcf7
commit 3bfe58abd2
3 changed files with 57 additions and 21 deletions

View File

@ -9,6 +9,8 @@ import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.Handler;
import java.util.Optional;
/** /**
* 基准测试 * 基准测试
* *
@ -20,9 +22,9 @@ import tech.powerjob.remote.framework.actor.Handler;
public class BenchmarkActor { public class BenchmarkActor {
@Handler(path = "standard") @Handler(path = "standard")
public BenchmarkResponse processStandardRequest(BenchmarkRequest request) { public BenchmarkResponse standardRequest(BenchmarkRequest request) {
long startTs = System.currentTimeMillis(); long startTs = System.currentTimeMillis();
log.info("[BenchmarkActor] receive request: {}", request); log.info("[BenchmarkActor] [standardRequest] receive request: {}", request);
BenchmarkResponse response = new BenchmarkResponse() BenchmarkResponse response = new BenchmarkResponse()
.setSuccess(true) .setSuccess(true)
.setContent(request.getContent()) .setContent(request.getContent())
@ -31,11 +33,28 @@ public class BenchmarkActor {
if (request.getResponseSize() != null && request.getResponseSize() > 0) { if (request.getResponseSize() != null && request.getResponseSize() > 0) {
response.setExtra(RandomStringUtils.randomPrint(request.getResponseSize())); response.setExtra(RandomStringUtils.randomPrint(request.getResponseSize()));
} }
executeSleep(request);
response.setServerCost(System.currentTimeMillis() - startTs);
return response;
}
@Handler(path = "emptyReturn")
public void emptyReturn(BenchmarkRequest request) {
log.info("[BenchmarkActor] [emptyReturn] receive request: {}", request);
executeSleep(request);
}
@Handler(path = "stringReturn")
public String stringReturn(BenchmarkRequest request) {
log.info("[BenchmarkActor] [stringReturn] receive request: {}", request);
executeSleep(request);
return RandomStringUtils.randomPrint(Optional.ofNullable(request.getResponseSize()).orElse(100));
}
private static void executeSleep(BenchmarkRequest request) {
if (request.getBlockingMills() != null && request.getBlockingMills() > 0) { if (request.getBlockingMills() != null && request.getBlockingMills() > 0) {
CommonUtils.easySleep(request.getBlockingMills()); CommonUtils.easySleep(request.getBlockingMills());
} }
response.setServerCost(System.currentTimeMillis() - startTs);
return response;
} }

View File

@ -2,14 +2,9 @@ package tech.powerjob.remote.http.vertx;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.*; import io.vertx.core.http.*;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import lombok.SneakyThrows;
import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol; import tech.powerjob.remote.framework.transporter.Protocol;
@ -49,6 +44,7 @@ public class VertxTransporter implements Transporter {
return post(url, request, clz); return post(url, request, clz);
} }
@SuppressWarnings("unchecked")
private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) { private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
final String host = url.getAddress().getHost(); final String host = url.getAddress().getHost();
final int port = url.getAddress().getPort(); final int port = url.getAddress().getPort();
@ -72,9 +68,18 @@ public class VertxTransporter implements Transporter {
)); ));
} }
// TODO: 验证无响应的情况 return httpClientResponse.body().compose(x -> {
return httpClientResponse.body().compose(x -> Future.succeededFuture(x.toJsonObject().mapTo(clz))); if (clz == null) {
return Future.succeededFuture(null);
}
if (clz.equals(String.class)) {
return Future.succeededFuture((T) x.toString());
}
return Future.succeededFuture(x.toJsonObject().mapTo(clz));
});
}).toCompletionStage(); }).toCompletionStage();
} }
} }

View File

@ -1,7 +1,6 @@
package tech.powerjob.remote.http; package tech.powerjob.remote.http;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.enums.Protocol;
@ -10,8 +9,6 @@ import tech.powerjob.remote.framework.BenchmarkActor;
import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation; import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.engine.EngineConfig; import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.RemoteEngine;
@ -21,8 +18,6 @@ import tech.powerjob.remote.framework.transporter.Transporter;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
/** /**
* HttpVertxCSInitializerTest * HttpVertxCSInitializerTest
* *
@ -47,19 +42,36 @@ class HttpVertxCSInitializerTest {
log.info("[HttpVertxCSInitializerTest] engine start up successfully!"); log.info("[HttpVertxCSInitializerTest] engine start up successfully!");
Transporter transporter = engineOutput.getTransporter(); Transporter transporter = engineOutput.getTransporter();
URL url = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark"));
BenchmarkActor.BenchmarkRequest request = new BenchmarkActor.BenchmarkRequest() BenchmarkActor.BenchmarkRequest request = new BenchmarkActor.BenchmarkRequest()
.setContent("request from test") .setContent("request from test")
.setBlockingMills(100) .setBlockingMills(100)
.setResponseSize(10240); .setResponseSize(10240);
log.info("[HttpVertxCSInitializerTest] test empty request!");
URL emptyURL = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("emptyReturn").setRootPath("benchmark"));
transporter.tell(emptyURL, request);
log.info("[HttpVertxCSInitializerTest] test string request!");
URL stringURL = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("stringReturn").setRootPath("benchmark"));
final String strResponse = transporter.ask(stringURL, request, String.class).toCompletableFuture().get();
log.info("[HttpVertxCSInitializerTest] strResponse: {}", strResponse);
log.info("[HttpVertxCSInitializerTest] test normal request!");
URL url = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark"));
final CompletionStage<BenchmarkActor.BenchmarkResponse> benchmarkResponseCompletionStage = transporter.ask(url, request, BenchmarkActor.BenchmarkResponse.class); final CompletionStage<BenchmarkActor.BenchmarkResponse> benchmarkResponseCompletionStage = transporter.ask(url, request, BenchmarkActor.BenchmarkResponse.class);
final BenchmarkActor.BenchmarkResponse response = benchmarkResponseCompletionStage.toCompletableFuture().get(10, TimeUnit.SECONDS); final BenchmarkActor.BenchmarkResponse response = benchmarkResponseCompletionStage.toCompletableFuture().get(10, TimeUnit.SECONDS);
log.info("[HttpVertxCSInitializerTest] response: {}", response); log.info("[HttpVertxCSInitializerTest] response: {}", response);
CommonUtils.easySleep(1000000000);
CommonUtils.easySleep(10000);
} }
} }