feat: framwork api

This commit is contained in:
tjq 2023-01-01 20:25:11 +08:00
parent 87a1a1d7c1
commit 2c31e81c5f
3 changed files with 14 additions and 31 deletions

View File

@ -32,9 +32,9 @@ public interface Transporter {
* ask by request
* @param url url
* @param request request
* @param executorService thread pool, null is acceptable
* @param clz response type
* @return CompletionStage
* @throws RemotingException remote exception
*/
CompletionStage<Object> ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException;
<T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}

View File

@ -6,6 +6,7 @@ import akka.pattern.Patterns;
import com.google.common.collect.Maps;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
@ -63,9 +64,10 @@ public class AkkaTransporter implements Transporter {
}
@Override
public CompletionStage<Object> ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException {
@SuppressWarnings("unchecked")
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
ActorSelection actorSelection = fetchActorSelection(url);
return Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
}
private ActorSelection fetchActorSelection(URL url) {

View File

@ -17,7 +17,6 @@ import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.http.HttpProtocol;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
/**
* VertxTransporter
@ -42,15 +41,15 @@ public class VertxTransporter implements Transporter {
@Override
public void tell(URL url, PowerSerializable request) {
post(url, request);
post(url, request, null);
}
@Override
public CompletionStage<Object> ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException {
return post(url, request);
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
return post(url, request, clz);
}
private CompletionStage<Object> post(URL url, PowerSerializable request) {
private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
final String host = url.getAddress().getHost();
final int port = url.getAddress().getPort();
final String path = url.getLocation().toPath();
@ -72,28 +71,10 @@ public class VertxTransporter implements Transporter {
host, port, path, statusCode, httpClientResponse.statusMessage()
));
}
return httpClientResponse.body().compose(x -> {
// TODO: 类型转换
return Future.succeededFuture(x.toJson());
});
// TODO: 验证无响应的情况
return httpClientResponse.body().compose(x -> Future.succeededFuture(x.toJsonObject().mapTo(clz)));
}).toCompletionStage();
}
@SneakyThrows
public static void main(String[] args) {
final Vertx vertx = Vertx.vertx();
final HttpClient hc = Vertx.vertx().createHttpClient();
VertxTransporter transport = new VertxTransporter(hc);
ServerScheduleJobReq serverScheduleJobReq = new ServerScheduleJobReq();
serverScheduleJobReq.setJobId(1234L);
serverScheduleJobReq.setJobParams("asdasdas");
URL url = new URL();
url.setAddress(new Address().setHost("127.0.0.1").setPort(7890));
url.setLocation(new HandlerLocation().setRootPath("test").setMethodPath("abc"));
final CompletionStage<Object> post = transport.post(url, serverScheduleJobReq);
System.out.println(post.toCompletableFuture().get());
}
}