From 0f1e17e8627e57be1666c3ab908e9ecf6cc2253c Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 8 Feb 2021 20:48:03 +0800 Subject: [PATCH] feat: add HttpTransporter #209 --- .../powerjob/common/OmsSerializable.java | 8 ++ .../powerjob/common/ProtocolConstant.java | 18 +++++ .../request/ServerQueryInstanceStatusReq.java | 6 ++ .../common/request/ServerScheduleJobReq.java | 5 ++ .../common/request/ServerStopInstanceReq.java | 6 ++ .../powerjob/common/utils/NetUtils.java | 6 ++ powerjob-server/pom.xml | 5 ++ .../transport/impl/HttpTransporter.java | 73 +++++++++++++++++++ 8 files changed, 127 insertions(+) create mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProtocolConstant.java create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/HttpTransporter.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsSerializable.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsSerializable.java index aa132dad..0c90fde1 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsSerializable.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsSerializable.java @@ -9,4 +9,12 @@ import java.io.Serializable; * @since 2020/4/16 */ public interface OmsSerializable extends Serializable { + + /** + * request path for http or other protocol, like 'stopInstance' + * @return null for non-http request object or no-null path for http request needed object + */ + default String path() { + return null; + } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProtocolConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProtocolConstant.java new file mode 100644 index 00000000..f3e6ee4c --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/ProtocolConstant.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.powerjob.common; + +/** + * HttpProtocolConstant + * + * @author tjq + * @since 2021/2/8 + */ +public class ProtocolConstant { + + public static final String SERVER_PATH_HEARTBEAT = "heartbeat"; + public static final String SERVER_PATH_STATUS_REPORT = "statusReport"; + public static final String SERVER_PATH_LOG_REPORT = "logReport"; + + public static final String WORKER_PATH_DISPATCH_JOB = "/worker/dispatchJob"; + public static final String WORKER_PATH_STOP_INSTANCE = "/worker/stopInstance"; + public static final String WORKER_PATH_QUERY_INSTANCE_INFO = "/worker/queryInstanceInfo"; +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerQueryInstanceStatusReq.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerQueryInstanceStatusReq.java index 644ab5f3..c8ccf788 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerQueryInstanceStatusReq.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerQueryInstanceStatusReq.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.common.request; import com.github.kfcfans.powerjob.common.OmsSerializable; +import com.github.kfcfans.powerjob.common.ProtocolConstant; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -16,4 +17,9 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class ServerQueryInstanceStatusReq implements OmsSerializable { private Long instanceId; + + @Override + public String path() { + return ProtocolConstant.WORKER_PATH_QUERY_INSTANCE_INFO; + } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerScheduleJobReq.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerScheduleJobReq.java index 24491b01..72c45392 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerScheduleJobReq.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerScheduleJobReq.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.common.request; import com.github.kfcfans.powerjob.common.OmsSerializable; +import com.github.kfcfans.powerjob.common.ProtocolConstant; import lombok.Data; import java.util.List; @@ -71,4 +72,8 @@ public class ServerScheduleJobReq implements OmsSerializable { // 最大同时运行任务数,默认 1 private Integer maxInstanceNum; + @Override + public String path() { + return ProtocolConstant.WORKER_PATH_DISPATCH_JOB; + } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerStopInstanceReq.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerStopInstanceReq.java index 2361f55d..c603828b 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerStopInstanceReq.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerStopInstanceReq.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.common.request; import com.github.kfcfans.powerjob.common.OmsSerializable; +import com.github.kfcfans.powerjob.common.ProtocolConstant; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -17,4 +18,9 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class ServerStopInstanceReq implements OmsSerializable { private Long instanceId; + + @Override + public String path() { + return ProtocolConstant.WORKER_PATH_STOP_INSTANCE; + } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java index 8b0c5105..453932b0 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java @@ -18,6 +18,7 @@ limitations under the License. import com.github.kfcfans.powerjob.common.PowerJobDKey; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import java.io.IOException; import java.net.*; @@ -304,6 +305,11 @@ public class NetUtils { return Objects.equals(networkInterface.getName(), preferredNetworkInterface); } + public static Pair splitAddress2IpAndPort(String address) { + String[] split = address.split(":"); + return Pair.of(split[0], Integer.valueOf(split[1])); + } + static boolean ignoreInterfaceByConfig(String interfaceName) { String regex = System.getProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX); if (StringUtils.isBlank(regex)) { diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index e0cdb822..48e42d19 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -190,6 +190,11 @@ vertx-web ${vertx-web.version} + + io.vertx + vertx-web-client + ${vertx-web.version} + diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/HttpTransporter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/HttpTransporter.java new file mode 100644 index 00000000..38de6f6c --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/transport/impl/HttpTransporter.java @@ -0,0 +1,73 @@ +package com.github.kfcfans.powerjob.server.transport.impl; + +import com.github.kfcfans.powerjob.common.OmsSerializable; +import com.github.kfcfans.powerjob.common.Protocol; +import com.github.kfcfans.powerjob.common.RemoteConstant; +import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.transport.Transporter; +import com.github.kfcfans.powerjob.server.transport.starter.VertXStarter; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * http transporter powered by vert.x + * + * @author tjq + * @since 2021/2/8 + */ +@Slf4j +@Service +public class HttpTransporter implements Transporter { + + private final WebClient webClient; + + public HttpTransporter() { + WebClientOptions options = new WebClientOptions() + .setKeepAlive(false) + .setConnectTimeout((int) RemoteConstant.DEFAULT_TIMEOUT_MS); + webClient = WebClient.create(Vertx.vertx(), options); + } + + @Override + public Protocol getProtocol() { + return Protocol.HTTP; + } + + @Override + public String getAddress() { + return VertXStarter.getAddress(); + } + + @Override + public void tell(String address, OmsSerializable object) { + postRequest(address, object); + } + + @Override + public AskResponse ask(String address, OmsSerializable object) throws Exception { + CompletableFuture> future = postRequest(address, object).toCompletionStage().toCompletableFuture(); + HttpResponse httpResponse = future.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + return httpResponse.bodyAsJson(AskResponse.class); + } + + private Future> postRequest(String address, OmsSerializable object) { + Pair ipAndPort = NetUtils.splitAddress2IpAndPort(address); + String ip = ipAndPort.getLeft(); + int port = ipAndPort.getRight(); + return webClient.post(port, ip, object.path()) + .sendJson(object) + .onSuccess(res -> log.info("[HttpTransporter] send request to {}{} successfully: {}, response: {}", address, object.path(), object, res)) + .onFailure(t -> log.warn("[HttpTransporter] send request to {}{} failed: {}", address, object.path(), object, t)); + } +}