feat: add HttpTransporter #209

This commit is contained in:
tjq 2021-02-08 20:48:03 +08:00
parent 57501075de
commit 0f1e17e862
8 changed files with 127 additions and 0 deletions

View File

@ -9,4 +9,12 @@ import java.io.Serializable;
* @since 2020/4/16
*/
public interface OmsSerializable extends Serializable {
/**
* request path for http or other protocol, like 'stopInstance'
* @return null for non-http request object or no-null path for http request needed object
*/
default String path() {
return null;
}
}

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.powerjob.common;
/**
* HttpProtocolConstant
*
* @author tjq
* @since 2021/2/8
*/
public class ProtocolConstant {
public static final String SERVER_PATH_HEARTBEAT = "heartbeat";
public static final String SERVER_PATH_STATUS_REPORT = "statusReport";
public static final String SERVER_PATH_LOG_REPORT = "logReport";
public static final String WORKER_PATH_DISPATCH_JOB = "/worker/dispatchJob";
public static final String WORKER_PATH_STOP_INSTANCE = "/worker/stopInstance";
public static final String WORKER_PATH_QUERY_INSTANCE_INFO = "/worker/queryInstanceInfo";
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.ProtocolConstant;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -16,4 +17,9 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class ServerQueryInstanceStatusReq implements OmsSerializable {
private Long instanceId;
@Override
public String path() {
return ProtocolConstant.WORKER_PATH_QUERY_INSTANCE_INFO;
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.ProtocolConstant;
import lombok.Data;
import java.util.List;
@ -71,4 +72,8 @@ public class ServerScheduleJobReq implements OmsSerializable {
// 最大同时运行任务数默认 1
private Integer maxInstanceNum;
@Override
public String path() {
return ProtocolConstant.WORKER_PATH_DISPATCH_JOB;
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.ProtocolConstant;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -17,4 +18,9 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class ServerStopInstanceReq implements OmsSerializable {
private Long instanceId;
@Override
public String path() {
return ProtocolConstant.WORKER_PATH_STOP_INSTANCE;
}
}

View File

@ -18,6 +18,7 @@ limitations under the License.
import com.github.kfcfans.powerjob.common.PowerJobDKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.io.IOException;
import java.net.*;
@ -304,6 +305,11 @@ public class NetUtils {
return Objects.equals(networkInterface.getName(), preferredNetworkInterface);
}
public static Pair<String, Integer> splitAddress2IpAndPort(String address) {
String[] split = address.split(":");
return Pair.of(split[0], Integer.valueOf(split[1]));
}
static boolean ignoreInterfaceByConfig(String interfaceName) {
String regex = System.getProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX);
if (StringUtils.isBlank(regex)) {

View File

@ -190,6 +190,11 @@
<artifactId>vertx-web</artifactId>
<version>${vertx-web.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<version>${vertx-web.version}</version>
</dependency>
<!-- swagger2 -->

View File

@ -0,0 +1,73 @@
package com.github.kfcfans.powerjob.server.transport.impl;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.transport.Transporter;
import com.github.kfcfans.powerjob.server.transport.starter.VertXStarter;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* http transporter powered by vert.x
*
* @author tjq
* @since 2021/2/8
*/
@Slf4j
@Service
public class HttpTransporter implements Transporter {
private final WebClient webClient;
public HttpTransporter() {
WebClientOptions options = new WebClientOptions()
.setKeepAlive(false)
.setConnectTimeout((int) RemoteConstant.DEFAULT_TIMEOUT_MS);
webClient = WebClient.create(Vertx.vertx(), options);
}
@Override
public Protocol getProtocol() {
return Protocol.HTTP;
}
@Override
public String getAddress() {
return VertXStarter.getAddress();
}
@Override
public void tell(String address, OmsSerializable object) {
postRequest(address, object);
}
@Override
public AskResponse ask(String address, OmsSerializable object) throws Exception {
CompletableFuture<HttpResponse<Buffer>> future = postRequest(address, object).toCompletionStage().toCompletableFuture();
HttpResponse<Buffer> httpResponse = future.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return httpResponse.bodyAsJson(AskResponse.class);
}
private Future<HttpResponse<Buffer>> postRequest(String address, OmsSerializable object) {
Pair<String, Integer> ipAndPort = NetUtils.splitAddress2IpAndPort(address);
String ip = ipAndPort.getLeft();
int port = ipAndPort.getRight();
return webClient.post(port, ip, object.path())
.sendJson(object)
.onSuccess(res -> log.info("[HttpTransporter] send request to {}{} successfully: {}, response: {}", address, object.path(), object, res))
.onFailure(t -> log.warn("[HttpTransporter] send request to {}{} failed: {}", address, object.path(), object, t));
}
}