From 48ed784ea3d47a725dd721266c19b9b429af0f04 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 15 May 2020 08:20:27 +0800 Subject: [PATCH] [dev] ensure OMS-Client's HA --- .../github/kfcfans/oms/client/OhMyClient.java | 110 ++++++++++++------ .../oms/server/mr/DAGSimulationProcessor.java | 70 +++++++++++ 2 files changed, 144 insertions(+), 36 deletions(-) create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/mr/DAGSimulationProcessor.java diff --git a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java index 9f64cb2e..eed52df1 100644 --- a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java +++ b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java @@ -8,12 +8,14 @@ import com.github.kfcfans.oms.common.response.JobInfoDTO; import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.utils.HttpUtils; import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import okhttp3.FormBody; import okhttp3.MediaType; import okhttp3.RequestBody; import org.apache.commons.lang3.StringUtils; +import java.util.List; import java.util.Objects; /** @@ -27,40 +29,57 @@ import java.util.Objects; @SuppressWarnings("rawtypes, unchecked") public class OhMyClient { - private String domain; private Long appId; + private String currentAddress; + private List allAddress; private static final String URL_PATTERN = "http://%s%s%s"; /** * 初始化 OhMyClient 客户端 - * @param domain 服务器地址,eg:192.168.1.1:7700(选定主机,无HA保证) / www.oms-server.com(内网域名,自行完成DNS & Proxy) + * @param domain www.oms-server.com(内网域名,自行完成DNS & Proxy) * @param appName 负责的应用名称 */ - public OhMyClient(String domain, String appName) throws Exception { + public OhMyClient(String domain, String appName) { + this(Lists.newArrayList(domain), appName); + } - Objects.requireNonNull(domain, "domain can't be null!"); + /** + * 初始化 OhMyClient 客户端 + * @param addressList IP:Port 列表 + * @param appName 负责的应用名称 + */ + public OhMyClient(List addressList, String appName) { + + Objects.requireNonNull(addressList, "domain can't be null!"); Objects.requireNonNull(appName, "appName can't be null"); - this.domain = domain; - - // 验证 appName 可用性 & server可用性 - String url = getUrl(OpenAPIConstant.ASSERT) + "?appName=" + appName; - String result = HttpUtils.get(url); - if (StringUtils.isNotEmpty(result)) { - ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class); - if (resultDTO.isSuccess()) { - appId = Long.parseLong(resultDTO.getData().toString()); - }else { - throw new OmsOpenApiException(resultDTO.getMessage()); + allAddress = addressList; + for (String addr : addressList) { + String url = getUrl(addr, addr) + "?appName=" + appName; + try { + String result = HttpUtils.get(url); + if (StringUtils.isNotEmpty(result)) { + ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class); + if (resultDTO.isSuccess()) { + appId = Long.parseLong(resultDTO.getData().toString()); + currentAddress = addr; + break; + } + } + }catch (Exception ignore) { } } - log.info("[OhMyClient] {}'s client bootstrap successfully.", appName); + + if (StringUtils.isEmpty(currentAddress)) { + throw new OmsOpenApiException("no server available"); + } + log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName); } - private String getUrl(String path) { - return String.format(URL_PATTERN, domain, OpenAPIConstant.WEB_PATH, path); + private static String getUrl(String path, String address) { + return String.format(URL_PATTERN, address, OpenAPIConstant.WEB_PATH, path); } /* ************* Job 区 ************* */ @@ -74,10 +93,9 @@ public class OhMyClient { public ResultDTO saveJob(SaveJobInfoRequest request) throws Exception { request.setAppId(appId); - String url = getUrl(OpenAPIConstant.SAVE_JOB); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); String json = JsonUtils.toJSONStringUnsafe(request); - String post = HttpUtils.post(url, RequestBody.create(json, jsonType)); + String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType)); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -88,12 +106,11 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO fetchJob(Long jobId) throws Exception { - String url = getUrl(OpenAPIConstant.FETCH_JOB); RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.FETCH_JOB, body); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -104,12 +121,11 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO disableJob(Long jobId) throws Exception { - String url = getUrl(OpenAPIConstant.DISABLE_JOB); RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.DISABLE_JOB, body); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -120,12 +136,11 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO enableJob(Long jobId) throws Exception { - String url = getUrl(OpenAPIConstant.ENABLE_JOB); RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.ENABLE_JOB, body); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -136,12 +151,11 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO deleteJob(Long jobId) throws Exception { - String url = getUrl(OpenAPIConstant.DELETE_JOB); RequestBody body = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.DELETE_JOB, body); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -153,7 +167,6 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO runJob(Long jobId, String instanceParams) throws Exception { - String url = getUrl(OpenAPIConstant.RUN_JOB); FormBody.Builder builder = new FormBody.Builder() .add("jobId", jobId.toString()) .add("appId", appId.toString()); @@ -161,7 +174,7 @@ public class OhMyClient { if (StringUtils.isNotEmpty(instanceParams)) { builder.add("instanceParams", instanceParams); } - String post = HttpUtils.post(url, builder.build()); + String post = postHA(OpenAPIConstant.RUN_JOB, builder.build()); return JsonUtils.parseObject(post, ResultDTO.class); } public ResultDTO runJob(Long jobId) throws Exception { @@ -176,12 +189,11 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO stopInstance(Long instanceId) throws Exception { - String url = getUrl(OpenAPIConstant.STOP_INSTANCE); RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .add("appId", appId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.STOP_INSTANCE, body); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -192,11 +204,10 @@ public class OhMyClient { * @throws Exception 异常 */ public ResultDTO fetchInstanceStatus(Long instanceId) throws Exception { - String url = getUrl(OpenAPIConstant.FETCH_INSTANCE_STATUS); RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body); return JsonUtils.parseObject(post, ResultDTO.class); } @@ -207,11 +218,38 @@ public class OhMyClient { * @throws Exception 潜在的异常 */ public ResultDTO fetchInstanceInfo(Long instanceId) throws Exception { - String url = getUrl(OpenAPIConstant.FETCH_INSTANCE_INFO); RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) .build(); - String post = HttpUtils.post(url, body); + String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body); return JsonUtils.parseObject(post, ResultDTO.class); } + + private String postHA(String path, RequestBody requestBody) { + + // 先尝试默认地址 + try { + String res = HttpUtils.post(getUrl(path, currentAddress), requestBody); + if (StringUtils.isNotEmpty(res)) { + return res; + } + }catch (Exception ignore) { + } + + // 失败,开始重试 + for (String addr : allAddress) { + try { + String res = HttpUtils.post(getUrl(path, addr), requestBody); + if (StringUtils.isNotEmpty(res)) { + log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr); + currentAddress = addr; + return res; + } + }catch (Exception ignore) { + } + } + + log.error("[OhMyClient] no server available in {}.", allAddress); + throw new OmsOpenApiException("no server available"); + } } diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/mr/DAGSimulationProcessor.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/mr/DAGSimulationProcessor.java new file mode 100644 index 00000000..fcf88acc --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/mr/DAGSimulationProcessor.java @@ -0,0 +1,70 @@ +package com.github.kfcfans.oms.server.mr; + +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.TaskResult; +import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor; +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * 模拟 DAG 的处理器(在正式提供DAG支持前,可用该方法代替) + * + * ROOT -> A -> B -> REDUCE + * -> C + * + * @author tjq + * @since 2020/5/15 + */ +public class DAGSimulationProcessor extends MapReduceProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + if (isRootTask()) { + // L1. 执行根任务 + + // 执行完毕后产生子任务 A,需要传递的参数可以作为 TaskA 的属性进行传递 + TaskA taskA = new TaskA(); + return map(Lists.newArrayList(taskA), "LEVEL1_TASK_A"); + } + + if (context.getSubTask() instanceof TaskA) { + // L2. 执行A任务 + + // 执行完成后产生子任务 B,C(并行执行) + TaskB taskB = new TaskB(); + TaskC taskC = new TaskC(); + return map(Lists.newArrayList(taskB, taskC), "LEVEL2_TASK_BC"); + } + + if (context.getSubTask() instanceof TaskB) { + // L3. 执行B任务 + return new ProcessResult(true, "xxx"); + } + if (context.getSubTask() instanceof TaskC) { + // L3. 执行C任务 + return new ProcessResult(true, "xxx"); + } + + return new ProcessResult(false, "UNKNOWN_TYPE_OF_SUB_TASK"); + } + + @Override + public ProcessResult reduce(TaskContext context, List taskResults) { + // L4. 执行最终 Reduce 任务,taskResults保存了之前所有任务的结果 + taskResults.forEach(taskResult -> { + // do something... + }); + return new ProcessResult(true, "reduce success"); + } + + private static class TaskA { + } + private static class TaskB { + } + private static class TaskC { + + } +}