diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/transporter/Transporter.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/transporter/Transporter.java index be69945e..f66cb1af 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/transporter/Transporter.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/transporter/Transporter.java @@ -32,9 +32,9 @@ public interface Transporter { * ask by request * @param url url * @param request request - * @param executorService thread pool, null is acceptable + * @param clz response type * @return CompletionStage * @throws RemotingException remote exception */ - CompletionStage ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException; + CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException; } diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java index bac619ca..96d0fd8b 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java @@ -6,6 +6,7 @@ import akka.pattern.Patterns; import com.google.common.collect.Maps; import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.ServerType; @@ -63,9 +64,10 @@ public class AkkaTransporter implements Transporter { } @Override - public CompletionStage ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException { + @SuppressWarnings("unchecked") + public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { ActorSelection actorSelection = fetchActorSelection(url); - return Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + return (CompletionStage) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); } private ActorSelection fetchActorSelection(URL url) { diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java index eaadd5ab..b291f39c 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/vertx/VertxTransporter.java @@ -17,7 +17,6 @@ import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.remote.http.HttpProtocol; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutorService; /** * VertxTransporter @@ -42,15 +41,15 @@ public class VertxTransporter implements Transporter { @Override public void tell(URL url, PowerSerializable request) { - post(url, request); + post(url, request, null); } @Override - public CompletionStage ask(URL url, PowerSerializable request, ExecutorService executorService) throws RemotingException { - return post(url, request); + public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { + return post(url, request, clz); } - private CompletionStage post(URL url, PowerSerializable request) { + private CompletionStage post(URL url, PowerSerializable request, Class clz) { final String host = url.getAddress().getHost(); final int port = url.getAddress().getPort(); final String path = url.getLocation().toPath(); @@ -72,28 +71,10 @@ public class VertxTransporter implements Transporter { host, port, path, statusCode, httpClientResponse.statusMessage() )); } - return httpClientResponse.body().compose(x -> { - // TODO: 类型转换 - return Future.succeededFuture(x.toJson()); - }); + + // TODO: 验证无响应的情况 + + return httpClientResponse.body().compose(x -> Future.succeededFuture(x.toJsonObject().mapTo(clz))); }).toCompletionStage(); } - - @SneakyThrows - public static void main(String[] args) { - final Vertx vertx = Vertx.vertx(); - final HttpClient hc = Vertx.vertx().createHttpClient(); - VertxTransporter transport = new VertxTransporter(hc); - - ServerScheduleJobReq serverScheduleJobReq = new ServerScheduleJobReq(); - serverScheduleJobReq.setJobId(1234L); - serverScheduleJobReq.setJobParams("asdasdas"); - - URL url = new URL(); - url.setAddress(new Address().setHost("127.0.0.1").setPort(7890)); - url.setLocation(new HandlerLocation().setRootPath("test").setMethodPath("abc")); - - final CompletionStage post = transport.post(url, serverScheduleJobReq); - System.out.println(post.toCompletableFuture().get()); - } }