develop the open api

This commit is contained in:
tjq 2020-04-15 16:41:47 +08:00
parent 19ffe6d052
commit a626e6f9ee
18 changed files with 574 additions and 85 deletions

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-client</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
</properties>
<dependencies>
<!-- oms-common -->
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler-common</artifactId>
<version>${oms.common.version}</version>
</dependency>
<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Junit 测试 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,148 @@
package com.github.kfcfans.oms.client;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.OpenAPIConstant;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.HttpUtils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
import okhttp3.RequestBody;
import org.apache.commons.lang3.StringUtils;
import java.util.Objects;
/**
* OpenAPI 客户端
* V1.0.0 摒弃一切优雅设计先实现再说...
*
* @author tjq
* @since 2020/4/15
*/
@Slf4j
@SuppressWarnings("rawtypes, unchecked")
public class OhMyClient {
private String domain;
private Long appId;
private static final String URL_PATTERN = "http://%s%s%s";
/**
* 初始化 OhMyClient 客户端
* @param domain 服务器地址eg:192.168.1.1:7700选定主机无HA保证 / www.oms-server.com内网域名自行完成DNS & Proxy
* @param appName 负责的应用名称
*/
public OhMyClient(String domain, String appName) throws Exception {
Objects.requireNonNull(domain, "domain can't be null!");
Objects.requireNonNull(appName, "appName can't be null");
this.domain = domain;
// 验证 appName 可用性 & server可用性
String url = getUrl(OpenAPIConstant.ASSERT) + "?appName=" + appName;
String result = HttpUtils.get(url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
appId = Long.parseLong(resultDTO.getData().toString());
}else {
throw new RuntimeException(resultDTO.getMessage());
}
}
log.info("[OhMyClient] {}'s client bootstrap successfully.", appName);
}
private String getUrl(String path) {
return String.format(URL_PATTERN, domain, OpenAPIConstant.WEB_PATH, path);
}
/* ************* Job 区 ************* */
/**
* 禁用某个任务
* @param jobId 任务ID
* @return 标准返回对象
* @throws Exception 异常
*/
public ResultDTO<Void> disableJob(Long jobId) throws Exception {
String url = getUrl(OpenAPIConstant.DISABLE_JOB);
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
* 删除某个任务
* @param jobId 任务ID
* @return 标准返回对象
* @throws Exception 异常
*/
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
String url = getUrl(OpenAPIConstant.DELETE_JOB);
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
* 运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例的参数
* @return 任务实例IDinstanceId
* @throws Exception 异常
*/
public ResultDTO<Long> runJob(Long jobId, String instanceParams) throws Exception {
String url = getUrl(OpenAPIConstant.RUN_JOB);
final FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString());
if (StringUtils.isNotEmpty(instanceParams)) {
builder.add("instanceParams", instanceParams);
}
String post = HttpUtils.post(url, builder.build());
return JSONObject.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
return runJob(jobId, null);
}
/* ************* Instance 区 ************* */
/**
* 停止应用实例
* @param instanceId 应用实例ID
* @return true -> 停止成功false -> 停止失败
* @throws Exception 异常
*/
public ResultDTO<Void> stopInstance(Long instanceId) throws Exception {
String url = getUrl(OpenAPIConstant.STOP_INSTANCE);
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
* 查询应用实例状态
* @param instanceId 应用实例ID
* @return {@link com.github.kfcfans.common.InstanceStatus} 的枚举值
* @throws Exception 异常
*/
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) throws Exception {
String url = getUrl(OpenAPIConstant.FETCH_INSTANCE_STATUS);
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
}
}

View File

@ -0,0 +1,30 @@
import com.github.kfcfans.oms.client.OhMyClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* 测试 Client
*
* @author tjq
* @since 2020/4/15
*/
public class TestClient {
private static OhMyClient ohMyClient;
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test");
}
@Test
public void testInstanceOpenAPI() throws Exception {
System.out.println(ohMyClient.stopInstance(1586855173043L));
System.out.println(ohMyClient.fetchInstanceStatus(1586855173043L));
}
@Test
public void testJobOpenAPI() throws Exception {
System.out.println(ohMyClient.runJob(1L, "hhhh"));
}
}

View File

@ -17,6 +17,7 @@
<slf4j.version>1.7.30</slf4j.version>
<commons.lang.version>3.10</commons.lang.version>
<guava.version>28.2-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version>
</properties>
<dependencies>
@ -40,6 +41,13 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- OKHttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.common;
/**
* OpenAPI 常量
*
* @author tjq
* @since 2020/4/15
*/
public class OpenAPIConstant {
public static final String WEB_PATH = "/openApi";
public static final String ASSERT = "/assert";
public static final String SAVE_JOB = "/saveJob";
public static final String DELETE_JOB = "/deleteJob";
public static final String DISABLE_JOB = "/disableJob";
public static final String RUN_JOB = "/runJob";
public static final String STOP_INSTANCE = "/stopInstance";
public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus";
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.common.response;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
@ -12,10 +13,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
*/
@Getter
@Setter
@ToString
public class ResultDTO<T> {
private boolean success;
// 数据success为 true 时存在
private T data;
// 错误信息success为 false 时存在
private String message;
public static <T> ResultDTO<T> success(T data) {

View File

@ -1,7 +1,8 @@
package com.github.kfcfans.oms.worker.common.utils;
package com.github.kfcfans.common.utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
@ -39,4 +40,17 @@ public class HttpUtils {
return null;
}
public static String post(String url, RequestBody requestBody) throws IOException {
Request request = new Request.Builder()
.post(requestBody)
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
if (response.code() == HTTP_SUCCESS_CODE) {
return Objects.requireNonNull(response.body()).string();
}
}
return null;
}
}

View File

@ -14,7 +14,7 @@ import lombok.Getter;
public enum JobStatus {
ENABLE(1),
STOPPED(2),
DISABLE(2),
DELETED(99);
private int v;

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@ -23,14 +25,25 @@ public class CacheService {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceLogRepository instanceLogRepository;
private final Cache<Long, String> jobId2JobNameCache;
private final Cache<Long, Long> instanceId2AppId;
private final Cache<Long, Long> jobId2AppId;
public CacheService() {
jobId2JobNameCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.maximumSize(1024)
.build();
instanceId2AppId = CacheBuilder.newBuilder()
.maximumSize(4096)
.build();
jobId2AppId = CacheBuilder.newBuilder()
.maximumSize(4096)
.build();
}
/**
@ -40,11 +53,48 @@ public class CacheService {
try {
return jobId2JobNameCache.get(jobId, () -> {
Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
// 防止缓存穿透 hhh
// 防止缓存穿透 hhh但是一开始没有后来创建的情况下会有问题不过问题不大这里就不管了
return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse("");
});
}catch (Exception e) {
log.error("[CacheService] getJobName for {} failed.", jobId, e);
log.error("[CacheService] getAppIdByInstanceId for {} failed.", jobId, e);
}
return null;
}
public Long getAppIdByInstanceId(Long instanceId) {
try {
return instanceId2AppId.get(instanceId, () -> {
// 内部记录数据库异常
try {
InstanceLogDO instanceLog = instanceLogRepository.findByInstanceId(instanceId);
if (instanceLog != null) {
return instanceLog.getAppId();
}
}catch (Exception e) {
log.error("[CacheService] getAppId for instanceId:{} failed.", instanceId, e);
}
return null;
});
}catch (Exception ignore) {
// 忽略缓存 load 失败的异常
}
return null;
}
public Long getAppIdByJobId(Long jobId) {
try {
return jobId2AppId.get(jobId, () -> {
try {
Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
return jobInfoDOOptional.map(JobInfoDO::getAppId).orElse(null);
}catch (Exception e) {
log.error("[CacheService] getAppId for job:{} failed.", jobId, e);
}
return null;
});
} catch (Exception ignore) {
}
return null;
}

View File

@ -0,0 +1,123 @@
package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
/**
* 任务服务
*
* @author tjq
* @since 2020/4/15
*/
@Slf4j
@Service
public class JobService {
@Resource
private InstanceService instanceService;
@Resource
private DispatchService dispatchService;
@Resource
private IdGenerateService idGenerateService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceLogRepository instanceLogRepository;
/**
* 手动立即运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例参数
* @return 任务实例ID
*/
public long runJob(Long jobId, String instanceParams) {
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
JobInfoDO jobInfo = jobInfoOPT.get();
long instanceId = idGenerateService.allocate();
InstanceLogDO executeLog = new InstanceLogDO();
executeLog.setJobId(jobId);
executeLog.setAppId(jobInfo.getAppId());
executeLog.setInstanceId(instanceId);
executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
executeLog.setExpectedTriggerTime(System.currentTimeMillis());
executeLog.setGmtCreate(new Date());
executeLog.setGmtModified(executeLog.getGmtCreate());
instanceLogRepository.saveAndFlush(executeLog);
dispatchService.dispatch(jobInfo, executeLog.getInstanceId(), 0, instanceParams);
return instanceId;
}
/**
* 删除某个任务
* @param jobId 任务ID
*/
public void deleteJob(Long jobId) {
shutdownOrStopJob(jobId, JobStatus.DELETED);
}
/**
* 禁用某个任务
*/
public void disableJob(Long jobId) {
shutdownOrStopJob(jobId, JobStatus.DISABLE);
}
/**
* 停止或删除某个JOB
* 秒级任务还要额外停止正在运行的任务实例
*/
private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException {
// 1. 先更新 job_info
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
JobInfoDO jobInfoDO = jobInfoOPT.get();
jobInfoDO.setStatus(status.getV());
jobInfoRepository.saveAndFlush(jobInfoDO);
// 2. 关闭秒级任务
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) {
return;
}
List<InstanceLogDO> executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
if (CollectionUtils.isEmpty(executeLogs)) {
return;
}
if (executeLogs.size() > 1) {
log.warn("[JobController] frequent job should just have one running instance, there must have some bug.");
}
executeLogs.forEach(instance -> {
try {
// 重复查询了数据库不过问题不大这个调用量很小
instanceService.stopInstance(instance.getInstanceId());
}catch (Exception ignore) {
}
});
}
}

View File

@ -7,12 +7,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 唯一ID生成服务使用 Twitter snowflake 算法
* 机房ID固定为0占用三位8个机房怎么样也够了吧
* 机器ID数据库自增占用7位最多支持128台机器
* 机房ID固定为0占用2位
* 机器ID数据库自增占用8位最多支持256台机器如果频繁部署需要删除数据库重置id
*
* @author tjq
* @since 2020/4/6

View File

@ -17,8 +17,8 @@ class SnowFlakeIdGenerator {
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 7; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 3;//数据中心占用的位数
private final static long MACHINE_BIT = 8; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数
/**
* 每一部分的最大值

View File

@ -14,7 +14,6 @@ import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.*;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@ -69,6 +68,20 @@ public class InstanceService {
taskTrackerActor.tell(req, null);
}
/**
* 获取任务实例的壮体啊
* @param instanceId 任务实例ID
* @return 任务实例的状态
*/
public InstanceStatus getInstanceStatus(Long instanceId) {
InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId);
if (instanceLogDO == null) {
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
return InstanceStatus.of(instanceLogDO.getStatus());
}
/**
* 获取任务实例的详细运行详细
* @param instanceId 任务实例ID

View File

@ -1,19 +1,16 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.PageResult;
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import com.github.kfcfans.oms.server.web.request.QueryJobInfoRequest;
@ -24,7 +21,6 @@ import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
@ -46,9 +42,7 @@ import java.util.stream.Collectors;
public class JobController {
@Resource
private DispatchService dispatchService;
@Resource
private IdGenerateService idGenerateService;
private JobService jobService;
@Resource
private InstanceService instanceService;
@ -68,7 +62,7 @@ public class JobController {
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
jobInfoDO.setTimeExpressionType(timeExpressionType.getV());
jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.STOPPED.getV());
jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxInstanceNum(0);
@ -90,31 +84,27 @@ public class JobController {
// 秒级任务直接调度执行
if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) {
runJobImmediately(jobInfoDO);
jobService.runJob(jobInfoDO.getId(), null);
}
return ResultDTO.success(null);
}
@GetMapping("/stop")
public ResultDTO<Void> stopJob(Long jobId) throws Exception {
shutdownOrStopJob(jobId, JobStatus.STOPPED);
@GetMapping("/disable")
public ResultDTO<Void> disableJob(Long jobId) throws Exception {
jobService.disableJob(jobId);
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
shutdownOrStopJob(jobId, JobStatus.DELETED);
jobService.deleteJob(jobId);
return ResultDTO.success(null);
}
@GetMapping("/run")
public ResultDTO<Void> runImmediately(Long jobId) {
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
runJobImmediately(jobInfoOPT.get());
jobService.runJob(jobId, null);
return ResultDTO.success(null);
}
@ -158,59 +148,8 @@ public class JobController {
return ResultDTO.success(convertPage(jobInfoPage));
}
/**
* 立即运行JOB
*/
private void runJobImmediately(JobInfoDO jobInfoDO) {
InstanceLogDO executeLog = new InstanceLogDO();
executeLog.setJobId(jobInfoDO.getId());
executeLog.setAppId(jobInfoDO.getAppId());
executeLog.setInstanceId(idGenerateService.allocate());
executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
executeLog.setExpectedTriggerTime(System.currentTimeMillis());
executeLog.setGmtCreate(new Date());
executeLog.setGmtModified(executeLog.getGmtCreate());
instanceLogRepository.saveAndFlush(executeLog);
dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0);
}
/**
* 停止或删除某个JOB
* 秒级任务还要额外停止正在运行的任务实例
*/
private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException {
// 1. 先更新 job_info
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
JobInfoDO jobInfoDO = jobInfoOPT.get();
jobInfoDO.setStatus(status.getV());
jobInfoRepository.saveAndFlush(jobInfoDO);
// 2. 关闭秒级任务
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) {
return;
}
List<InstanceLogDO> executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
if (CollectionUtils.isEmpty(executeLogs)) {
return;
}
if (executeLogs.size() > 1) {
log.warn("[JobController] frequent job has multi instance, there must ha");
}
executeLogs.forEach(instance -> {
try {
// 重复查询了数据库不过问题不大这个调用量很小
instanceService.stopInstance(instance.getInstanceId());
}catch (Exception ignore) {
}
});
}
private static PageResult<JobInfoVO> convertPage(Page<JobInfoDO> jobInfoPage) {
List<JobInfoVO> jobInfoVOList = jobInfoPage.getContent().stream().map(JobController::convert).collect(Collectors.toList());

View File

@ -0,0 +1,99 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.OpenAPIConstant;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.CacheService;
import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* 开发接口控制器对接 oms-client
*
* @author tjq
* @since 2020/4/15
*/
@RestController
@RequestMapping(OpenAPIConstant.WEB_PATH)
public class OpenAPIController {
@Resource
private JobService jobService;
@Resource
private InstanceService instanceService;
@Resource
private CacheService cacheService;
@Resource
private AppInfoRepository appInfoRepository;
@GetMapping(OpenAPIConstant.ASSERT)
public ResultDTO<Long> assertAppName(String appName) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName);
if (appInfo == null) {
return ResultDTO.failed(appName + " is not registered!");
}
return ResultDTO.success(appInfo.getId());
}
/* ************* Job 区 ************* */
@PostMapping(OpenAPIConstant.SAVE_JOB)
public ResultDTO<Void> newJob(ModifyJobInfoRequest request) {
return null;
}
@GetMapping(OpenAPIConstant.DELETE_JOB)
public ResultDTO<Void> deleteJob(Long jobId, Long appId) {
checkJobIdValid(jobId, appId);
jobService.deleteJob(jobId);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.DISABLE_JOB)
public ResultDTO<Void> disableJob(Long jobId, Long appId) {
checkJobIdValid(jobId, appId);
jobService.disableJob(jobId);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.RUN_JOB)
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams) {
checkJobIdValid(jobId, appId);
return ResultDTO.success(jobService.runJob(jobId, instanceParams));
}
/* ************* Instance 区 ************* */
@PostMapping(OpenAPIConstant.STOP_INSTANCE)
public ResultDTO<Void> stopInstance(Long instanceId, Long appId) {
checkInstanceIdValid(instanceId, appId);
instanceService.stopInstance(instanceId);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.FETCH_INSTANCE_STATUS)
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) {
InstanceStatus instanceStatus = instanceService.getInstanceStatus(instanceId);
return ResultDTO.success(instanceStatus.getV());
}
private void checkInstanceIdValid(Long instanceId, Long appId) {
Long realAppId = cacheService.getAppIdByInstanceId(instanceId);
if (appId.equals(realAppId)) {
return;
}
throw new IllegalArgumentException("instance is not belong to the app whose appId is " + appId);
}
private void checkJobIdValid(Long jobId, Long appId) {
Long realAppId = cacheService.getAppIdByJobId(jobId);
if (appId.equals(realAppId)) {
return;
}
throw new IllegalArgumentException("job is not belong to the app whose appId is " + appId);
}
}

View File

@ -12,7 +12,7 @@ import com.github.kfcfans.oms.worker.background.WorkerHealthReporter;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import com.github.kfcfans.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;

View File

@ -4,8 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.core.tracker.task.FrequentTaskTracker;
import com.github.kfcfans.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool;
import com.google.common.collect.Maps;
@ -15,7 +14,6 @@ import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 服务发现

View File

@ -11,6 +11,7 @@
<module>oh-my-scheduler-worker</module>
<module>oh-my-scheduler-server</module>
<module>oh-my-scheduler-common</module>
<module>oh-my-scheduler-client</module>
</modules>
<packaging>pom</packaging>