[dev] add new OpenAPI saveJob

This commit is contained in:
tjq 2020-05-14 14:17:49 +08:00
parent 8bc576f5e3
commit 2dad0d6525
10 changed files with 263 additions and 51 deletions

View File

@ -1,12 +1,16 @@
package com.github.kfcfans.oms.client;
import com.github.kfcfans.oms.client.model.ClientJobInfo;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OpenAPIConstant;
import com.github.kfcfans.oms.common.request.http.JobInfoRequest;
import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.common.utils.HttpUtils;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.apache.commons.lang3.StringUtils;
@ -27,6 +31,7 @@ public class OhMyClient {
private Long appId;
private static final String URL_PATTERN = "http://%s%s%s";
private static final Joiner commaJoiner = Joiner.on(",").skipNulls();
/**
* 初始化 OhMyClient 客户端
@ -48,7 +53,7 @@ public class OhMyClient {
if (resultDTO.isSuccess()) {
appId = Long.parseLong(resultDTO.getData().toString());
}else {
throw new RuntimeException(resultDTO.getMessage());
throw new OmsOpenApiException(resultDTO.getMessage());
}
}
log.info("[OhMyClient] {}'s client bootstrap successfully.", appName);
@ -60,6 +65,52 @@ public class OhMyClient {
}
/* ************* Job 区 ************* */
/**
* 保存任务包括创建与修改
* @param newJobInfo 任务详细参数
* @return 创建的任务ID
* @throws Exception 异常
*/
public ResultDTO<Long> saveJob(ClientJobInfo newJobInfo) throws Exception {
String designatedWorkers = null;
if (newJobInfo.getDesignatedWorkers() != null && !newJobInfo.getDesignatedWorkers().isEmpty()) {
designatedWorkers = commaJoiner.join(newJobInfo.getDesignatedWorkers());
}
JobInfoRequest jobInfoRequest = JobInfoRequest.builder().id(newJobInfo.getJobId())
.jobName(newJobInfo.getJobName())
.jobDescription(newJobInfo.getJobDescription())
.appId(appId)
.jobParams(newJobInfo.getJobParams())
.timeExpressionType(newJobInfo.getTimeExpressionType().name())
.timeExpression(newJobInfo.getTimeExpression())
.executeType(newJobInfo.getExecuteType().name())
.processorType(newJobInfo.getProcessorType().name())
.processorInfo(newJobInfo.getProcessorInfo())
.maxInstanceNum(newJobInfo.getMaxInstanceNum())
.concurrency(newJobInfo.getConcurrency())
.instanceTimeLimit(newJobInfo.getInstanceTimeLimit())
.instanceRetryNum(newJobInfo.getInstanceRetryNum())
.taskRetryNum(newJobInfo.getTaskRetryNum())
.minCpuCores(newJobInfo.getMinCpuCores())
.minMemorySpace(newJobInfo.getMinMemorySpace())
.minDiskSpace(newJobInfo.getMinDiskSpace())
.enable(newJobInfo.isEnable())
.designatedWorkers(designatedWorkers)
.maxWorkerCount(newJobInfo.getMaxWorkerCount())
.notifyUserIds(newJobInfo.getNotifyUserIds())
.build();
String url = getUrl(OpenAPIConstant.SAVE_JOB);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(jobInfoRequest);
String post = HttpUtils.post(url, RequestBody.create(json, jsonType));
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
* 禁用某个任务
* @param jobId 任务ID

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.oms.client;
/**
* 异常
*
* @author tjq
* @since 2020/5/14
*/
public class OmsOpenApiException extends RuntimeException {
public OmsOpenApiException() {
}
public OmsOpenApiException(String message) {
super(message);
}
public OmsOpenApiException(String message, Throwable cause) {
super(message, cause);
}
public OmsOpenApiException(Throwable cause) {
super(cause);
}
public OmsOpenApiException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,77 @@
package com.github.kfcfans.oms.client.model;
import com.github.kfcfans.oms.common.ExecuteType;
import com.github.kfcfans.oms.common.ProcessorType;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.google.common.collect.Lists;
import lombok.Data;
import java.util.List;
/**
* oms-client 使用的 JobInfo 对象用于创建/更新 任务
* id == null -> 创建
* id != null -> 更新更新为全字段覆盖即需要保证该对象包含所有参数不能仅传入更新字段
*
* @author tjq
* @since 2020/5/14
*/
@Data
public class ClientJobInfo {
// null -> 新增否则为更新
private Long jobId;
/* ************************** 任务基本信息 ************************** */
// 任务名称
private String jobName;
// 任务描述
private String jobDescription;
// 任务自带的参数
private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private TimeExpressionType timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
/* ************************** 执行方式 ************************** */
// 执行类型单机/广播/MR
private ExecuteType executeType;
// 执行器类型Java/Shell
private ProcessorType processorType;
// 执行器信息
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数默认 1
private Integer maxInstanceNum = 1;
// 并发度同时执行某个任务的最大线程数量
private Integer concurrency = 5;
// 任务整体超时时间
private Long instanceTimeLimit = 0L;
/* ************************** 重试配置 ************************** */
private Integer instanceRetryNum = 0;
private Integer taskRetryNum = 0;
/* ************************** 繁忙机器配置 ************************** */
// 最低CPU核心数量0代表不限
private double minCpuCores = 0;
// 最低内存空间单位 GB0代表不限
private double minMemorySpace = 0;
// 最低磁盘空间单位 GB0代表不限
private double minDiskSpace = 0;
/* ************************** 集群配置 ************************** */
// 指定机器运行空代表不限非空则只会使用其中的机器运行多值逗号分割
private List<String> designatedWorkers = Lists.newLinkedList();
// 最大机器数量<=0 代表无限制
private Integer maxWorkerCount = 0;
// 报警用户ID列表多值逗号分隔
private List<Long> notifyUserIds = Lists.newLinkedList();
// 是否启用任务
private boolean enable = true;
}

View File

@ -1,5 +1,11 @@
import com.github.kfcfans.oms.client.model.ClientJobInfo;
import com.github.kfcfans.oms.common.ExecuteType;
import com.github.kfcfans.oms.common.ProcessorType;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.client.OhMyClient;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -18,6 +24,25 @@ public class TestClient {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test");
}
@Test
public void testSaveJob() throws Exception {
ClientJobInfo newJobInfo = new ClientJobInfo();
newJobInfo.setJobId(6L);
newJobInfo.setJobName("omsOpenAPIJob");
newJobInfo.setJobDescription("tes OpenAPI");
newJobInfo.setJobParams("{'aa':'bb'}");
newJobInfo.setTimeExpressionType(TimeExpressionType.CRON);
newJobInfo.setTimeExpression("0 0 * * * ? ");
newJobInfo.setExecuteType(ExecuteType.STANDALONE);
newJobInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA);
newJobInfo.setProcessorInfo("com.github.kfcfans.oms.server.tester.OmsLogPerformanceTester");
newJobInfo.setDesignatedWorkers(Lists.newArrayList("192.168.1.1:2777"));
ResultDTO<Long> resultDTO = ohMyClient.saveJob(newJobInfo);
System.out.println(JsonUtils.toJSONString(resultDTO));
}
@Test
public void testStopInstance() throws Exception {
ResultDTO<Void> res = ohMyClient.stopInstance(132522955178508352L);

View File

@ -1,21 +1,26 @@
package com.github.kfcfans.oms.server.web.request;
package com.github.kfcfans.oms.common.request.http;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.List;
/**
* 创建/修改 JobInfo 请求
* 测试用例快速复制区域MAP_REDUCEEMBEDDED_JAVACRONcom.github.kfcfans.oms.processors.TestMapReduceProcessor
*
* @author tjq
* @since 2020/3/30
*/
@Data
public class ModifyJobInfoRequest {
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class JobInfoRequest {
// null -> 插入否则为更新
// 任务IDjobIdnull -> 插入否则为更新
private Long id;
/* ************************** 任务基本信息 ************************** */
// 任务名称
@ -68,7 +73,6 @@ public class ModifyJobInfoRequest {
// 1 正常运行2 停止不再调度
private boolean enable;
private Date gmtCreate;
/* ************************** 集群配置 ************************** */
// 指定机器运行空代表不限非空则只会使用其中的机器运行多值逗号分割

View File

@ -1,8 +1,6 @@
package com.github.kfcfans.oms.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
@ -27,6 +25,10 @@ public class JsonUtils {
return null;
}
public static String toJSONStringUnsafe(Object obj) throws JsonProcessingException {
return objectMapper.writeValueAsString(obj);
}
public static byte[] toBytes(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);

View File

@ -1,15 +1,21 @@
package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.common.ExecuteType;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.ProcessorType;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.JobInfoRequest;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -40,6 +46,58 @@ public class JobService {
@Resource
private InstanceInfoRepository instanceInfoRepository;
private static final Joiner commaJoiner = Joiner.on(",").skipNulls();
/**
* 保存/修改任务
* @param request 任务请求
* @return 创建的任务IDjobId
* @throws Exception 异常
*/
public Long saveJob(JobInfoRequest request) throws Exception {
JobInfoDO jobInfoDO;
if (request.getId() != null) {
jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
}else {
jobInfoDO = new JobInfoDO();
}
// 值拷贝
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpressionType());
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
jobInfoDO.setTimeExpressionType(timeExpressionType.getV());
jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxInstanceNum(0);
}
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(commaJoiner.join(request.getNotifyUserIds()));
}
// 计算下次调度时间
Date now = new Date();
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(request.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}
jobInfoDO.setGmtModified(now);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(now);
}
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
return res.getId();
}
/**
* 手动立即运行某个任务
* @param jobId 任务ID

View File

@ -10,7 +10,7 @@ import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoReposito
import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import com.github.kfcfans.oms.common.request.http.JobInfoRequest;
import com.github.kfcfans.oms.server.web.request.QueryJobInfoRequest;
import com.github.kfcfans.oms.server.web.response.JobInfoVO;
import com.google.common.base.Joiner;
@ -48,44 +48,10 @@ public class JobController {
private JobInfoRepository jobInfoRepository;
private static final Splitter commaSplitter = Splitter.on(",");
private static final Joiner commaJoiner = Joiner.on(",").skipNulls();
@PostMapping("/save")
public ResultDTO<Void> saveJobInfo(@RequestBody ModifyJobInfoRequest request) throws Exception {
JobInfoDO jobInfoDO = new JobInfoDO();
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpressionType());
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
jobInfoDO.setTimeExpressionType(timeExpressionType.getV());
jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxInstanceNum(0);
}
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(commaJoiner.join(request.getNotifyUserIds()));
}
// 计算下次调度时间
Date now = new Date();
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(request.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}
if (request.getId() == null) {
jobInfoDO.setGmtCreate(now);
}
jobInfoDO.setGmtModified(now);
jobInfoRepository.saveAndFlush(jobInfoDO);
public ResultDTO<Void> saveJobInfo(@RequestBody JobInfoRequest request) throws Exception {
jobService.saveJob(request);
return ResultDTO.success(null);
}

View File

@ -8,7 +8,7 @@ import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoReposito
import com.github.kfcfans.oms.server.service.CacheService;
import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import com.github.kfcfans.oms.common.request.http.JobInfoRequest;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@ -45,8 +45,8 @@ public class OpenAPIController {
/* ************* Job 区 ************* */
@PostMapping(OpenAPIConstant.SAVE_JOB)
public ResultDTO<Void> newJob(ModifyJobInfoRequest request) {
return null;
public ResultDTO<Long> saveJob(@RequestBody JobInfoRequest request) throws Exception {
return ResultDTO.success(jobService.saveJob(request));
}
@GetMapping(OpenAPIConstant.DELETE_JOB)

View File

@ -1,10 +1,10 @@
# STEP1: 系统部署 & 初始化
## 部署
# STEP1: 调度中心部署 & 初始化
## 调度中心部署
#### 要求
* 运行环境JDK8+
* 编译环境Maven3+
* 关系数据库任意Spring Data JPA支持的关系型数据库MySQL/Oracle/MS SQLServer...
* mongoDB任意支持GridFS的mongoDB版本4.2.6测试通过,其余未经测试,仅从理论角度分析可用)
* mongoDB(可选)任意支持GridFS的mongoDB版本4.2.6测试通过,其余未经测试,仅从理论角度分析可用)
#### 流程
1. 部署数据库:由于任务调度中心的数据持久层基于`Spring Data Jpa`实现,**开发者仅需要完成数据库的创建**即运行SQL`CREATE database if NOT EXISTS oms-product default character set utf8mb4 collate utf8mb4_unicode_ci;`