[dev] ensure OMS-Client's HA

This commit is contained in:
tjq 2020-05-15 08:20:27 +08:00
parent 2c030c23cf
commit 48ed784ea3
2 changed files with 144 additions and 36 deletions

View File

@ -8,12 +8,14 @@ import com.github.kfcfans.oms.common.response.JobInfoDTO;
import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.common.utils.HttpUtils;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Objects;
/**
@ -27,40 +29,57 @@ import java.util.Objects;
@SuppressWarnings("rawtypes, unchecked")
public class OhMyClient {
private String domain;
private Long appId;
private String currentAddress;
private List<String> allAddress;
private static final String URL_PATTERN = "http://%s%s%s";
/**
* 初始化 OhMyClient 客户端
* @param domain 服务器地址eg:192.168.1.1:7700选定主机无HA保证 / www.oms-server.com内网域名自行完成DNS & Proxy
* @param domain www.oms-server.com内网域名自行完成DNS & Proxy
* @param appName 负责的应用名称
*/
public OhMyClient(String domain, String appName) throws Exception {
public OhMyClient(String domain, String appName) {
this(Lists.newArrayList(domain), appName);
}
Objects.requireNonNull(domain, "domain can't be null!");
/**
* 初始化 OhMyClient 客户端
* @param addressList IP:Port 列表
* @param appName 负责的应用名称
*/
public OhMyClient(List<String> addressList, String appName) {
Objects.requireNonNull(addressList, "domain can't be null!");
Objects.requireNonNull(appName, "appName can't be null");
this.domain = domain;
// 验证 appName 可用性 & server可用性
String url = getUrl(OpenAPIConstant.ASSERT) + "?appName=" + appName;
allAddress = addressList;
for (String addr : addressList) {
String url = getUrl(addr, addr) + "?appName=" + appName;
try {
String result = HttpUtils.get(url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
appId = Long.parseLong(resultDTO.getData().toString());
}else {
throw new OmsOpenApiException(resultDTO.getMessage());
currentAddress = addr;
break;
}
}
log.info("[OhMyClient] {}'s client bootstrap successfully.", appName);
}catch (Exception ignore) {
}
}
if (StringUtils.isEmpty(currentAddress)) {
throw new OmsOpenApiException("no server available");
}
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
}
private String getUrl(String path) {
return String.format(URL_PATTERN, domain, OpenAPIConstant.WEB_PATH, path);
private static String getUrl(String path, String address) {
return String.format(URL_PATTERN, address, OpenAPIConstant.WEB_PATH, path);
}
/* ************* Job 区 ************* */
@ -74,10 +93,9 @@ public class OhMyClient {
public ResultDTO<Long> saveJob(SaveJobInfoRequest request) throws Exception {
request.setAppId(appId);
String url = getUrl(OpenAPIConstant.SAVE_JOB);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
String post = HttpUtils.post(url, RequestBody.create(json, jsonType));
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType));
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -88,12 +106,11 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<JobInfoDTO> fetchJob(Long jobId) throws Exception {
String url = getUrl(OpenAPIConstant.FETCH_JOB);
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.FETCH_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -104,12 +121,11 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<Void> disableJob(Long jobId) throws Exception {
String url = getUrl(OpenAPIConstant.DISABLE_JOB);
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.DISABLE_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -120,12 +136,11 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<Void> enableJob(Long jobId) throws Exception {
String url = getUrl(OpenAPIConstant.ENABLE_JOB);
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.ENABLE_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -136,12 +151,11 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
String url = getUrl(OpenAPIConstant.DELETE_JOB);
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.DELETE_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -153,7 +167,6 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<Long> runJob(Long jobId, String instanceParams) throws Exception {
String url = getUrl(OpenAPIConstant.RUN_JOB);
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString());
@ -161,7 +174,7 @@ public class OhMyClient {
if (StringUtils.isNotEmpty(instanceParams)) {
builder.add("instanceParams", instanceParams);
}
String post = HttpUtils.post(url, builder.build());
String post = postHA(OpenAPIConstant.RUN_JOB, builder.build());
return JsonUtils.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
@ -176,12 +189,11 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<Void> stopInstance(Long instanceId) throws Exception {
String url = getUrl(OpenAPIConstant.STOP_INSTANCE);
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.STOP_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -192,11 +204,10 @@ public class OhMyClient {
* @throws Exception 异常
*/
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) throws Exception {
String url = getUrl(OpenAPIConstant.FETCH_INSTANCE_STATUS);
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -207,11 +218,38 @@ public class OhMyClient {
* @throws Exception 潜在的异常
*/
public ResultDTO<InstanceInfoDTO> fetchInstanceInfo(Long instanceId) throws Exception {
String url = getUrl(OpenAPIConstant.FETCH_INSTANCE_INFO);
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = HttpUtils.post(url, body);
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
private String postHA(String path, RequestBody requestBody) {
// 先尝试默认地址
try {
String res = HttpUtils.post(getUrl(path, currentAddress), requestBody);
if (StringUtils.isNotEmpty(res)) {
return res;
}
}catch (Exception ignore) {
}
// 失败开始重试
for (String addr : allAddress) {
try {
String res = HttpUtils.post(getUrl(path, addr), requestBody);
if (StringUtils.isNotEmpty(res)) {
log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr;
return res;
}
}catch (Exception ignore) {
}
}
log.error("[OhMyClient] no server available in {}.", allAddress);
throw new OmsOpenApiException("no server available");
}
}

View File

@ -0,0 +1,70 @@
package com.github.kfcfans.oms.server.mr;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.TaskResult;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor;
import com.google.common.collect.Lists;
import java.util.List;
/**
* 模拟 DAG 的处理器在正式提供DAG支持前可用该方法代替
*
* ROOT -> A -> B -> REDUCE
* -> C
*
* @author tjq
* @since 2020/5/15
*/
public class DAGSimulationProcessor extends MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
if (isRootTask()) {
// L1. 执行根任务
// 执行完毕后产生子任务 A需要传递的参数可以作为 TaskA 的属性进行传递
TaskA taskA = new TaskA();
return map(Lists.newArrayList(taskA), "LEVEL1_TASK_A");
}
if (context.getSubTask() instanceof TaskA) {
// L2. 执行A任务
// 执行完成后产生子任务 BC并行执行
TaskB taskB = new TaskB();
TaskC taskC = new TaskC();
return map(Lists.newArrayList(taskB, taskC), "LEVEL2_TASK_BC");
}
if (context.getSubTask() instanceof TaskB) {
// L3. 执行B任务
return new ProcessResult(true, "xxx");
}
if (context.getSubTask() instanceof TaskC) {
// L3. 执行C任务
return new ProcessResult(true, "xxx");
}
return new ProcessResult(false, "UNKNOWN_TYPE_OF_SUB_TASK");
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
// L4. 执行最终 Reduce 任务taskResults保存了之前所有任务的结果
taskResults.forEach(taskResult -> {
// do something...
});
return new ProcessResult(true, "reduce success");
}
private static class TaskA {
}
private static class TaskB {
}
private static class TaskC {
}
}