diff --git a/oh-my-scheduler-client/pom.xml b/oh-my-scheduler-client/pom.xml new file mode 100644 index 00000000..13608555 --- /dev/null +++ b/oh-my-scheduler-client/pom.xml @@ -0,0 +1,44 @@ + + + + oh-my-scheduler + com.github.kfcfans + 1.0.0-SNAPSHOT + + + 4.0.0 + oh-my-scheduler-client + 1.0.0-SNAPSHOT + jar + + + 1.0.0-SNAPSHOT + 5.6.1 + 1.2.68 + + + + + + com.github.kfcfans + oh-my-scheduler-common + ${oms.common.version} + + + + com.alibaba + fastjson + ${fastjson.version} + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + \ No newline at end of file diff --git a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java new file mode 100644 index 00000000..15a07b55 --- /dev/null +++ b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java @@ -0,0 +1,148 @@ +package com.github.kfcfans.oms.client; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.common.OpenAPIConstant; +import com.github.kfcfans.common.response.ResultDTO; +import com.github.kfcfans.common.utils.HttpUtils; +import lombok.extern.slf4j.Slf4j; +import okhttp3.FormBody; +import okhttp3.RequestBody; +import org.apache.commons.lang3.StringUtils; + +import java.util.Objects; + +/** + * OpenAPI 客户端 + * V1.0.0 摒弃一切优雅设计,先实现再说... + * + * @author tjq + * @since 2020/4/15 + */ +@Slf4j +@SuppressWarnings("rawtypes, unchecked") +public class OhMyClient { + + private String domain; + private Long appId; + + private static final String URL_PATTERN = "http://%s%s%s"; + + /** + * 初始化 OhMyClient 客户端 + * @param domain 服务器地址,eg:192.168.1.1:7700(选定主机,无HA保证) / www.oms-server.com(内网域名,自行完成DNS & Proxy) + * @param appName 负责的应用名称 + */ + public OhMyClient(String domain, String appName) throws Exception { + + Objects.requireNonNull(domain, "domain can't be null!"); + Objects.requireNonNull(appName, "appName can't be null"); + + this.domain = domain; + + // 验证 appName 可用性 & server可用性 + String url = getUrl(OpenAPIConstant.ASSERT) + "?appName=" + appName; + String result = HttpUtils.get(url); + if (StringUtils.isNotEmpty(result)) { + ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class); + if (resultDTO.isSuccess()) { + appId = Long.parseLong(resultDTO.getData().toString()); + }else { + throw new RuntimeException(resultDTO.getMessage()); + } + } + log.info("[OhMyClient] {}'s client bootstrap successfully.", appName); + } + + + private String getUrl(String path) { + return String.format(URL_PATTERN, domain, OpenAPIConstant.WEB_PATH, path); + } + + /* ************* Job 区 ************* */ + /** + * 禁用某个任务 + * @param jobId 任务ID + * @return 标准返回对象 + * @throws Exception 异常 + */ + public ResultDTO disableJob(Long jobId) throws Exception { + String url = getUrl(OpenAPIConstant.DISABLE_JOB); + RequestBody body = new FormBody.Builder() + .add("jobId", jobId.toString()) + .add("appId", appId.toString()) + .build(); + String post = HttpUtils.post(url, body); + return JSONObject.parseObject(post, ResultDTO.class); + } + + /** + * 删除某个任务 + * @param jobId 任务ID + * @return 标准返回对象 + * @throws Exception 异常 + */ + public ResultDTO deleteJob(Long jobId) throws Exception { + String url = getUrl(OpenAPIConstant.DELETE_JOB); + RequestBody body = new FormBody.Builder() + .add("jobId", jobId.toString()) + .add("appId", appId.toString()) + .build(); + String post = HttpUtils.post(url, body); + return JSONObject.parseObject(post, ResultDTO.class); + } + + /** + * 运行某个任务 + * @param jobId 任务ID + * @param instanceParams 任务实例的参数 + * @return 任务实例ID(instanceId) + * @throws Exception 异常 + */ + public ResultDTO runJob(Long jobId, String instanceParams) throws Exception { + String url = getUrl(OpenAPIConstant.RUN_JOB); + final FormBody.Builder builder = new FormBody.Builder() + .add("jobId", jobId.toString()) + .add("appId", appId.toString()); + + if (StringUtils.isNotEmpty(instanceParams)) { + builder.add("instanceParams", instanceParams); + } + String post = HttpUtils.post(url, builder.build()); + return JSONObject.parseObject(post, ResultDTO.class); + } + public ResultDTO runJob(Long jobId) throws Exception { + return runJob(jobId, null); + } + + /* ************* Instance 区 ************* */ + /** + * 停止应用实例 + * @param instanceId 应用实例ID + * @return true -> 停止成功,false -> 停止失败 + * @throws Exception 异常 + */ + public ResultDTO stopInstance(Long instanceId) throws Exception { + String url = getUrl(OpenAPIConstant.STOP_INSTANCE); + RequestBody body = new FormBody.Builder() + .add("instanceId", instanceId.toString()) + .add("appId", appId.toString()) + .build(); + String post = HttpUtils.post(url, body); + return JSONObject.parseObject(post, ResultDTO.class); + } + + /** + * 查询应用实例状态 + * @param instanceId 应用实例ID + * @return {@link com.github.kfcfans.common.InstanceStatus} 的枚举值 + * @throws Exception 异常 + */ + public ResultDTO fetchInstanceStatus(Long instanceId) throws Exception { + String url = getUrl(OpenAPIConstant.FETCH_INSTANCE_STATUS); + RequestBody body = new FormBody.Builder() + .add("instanceId", instanceId.toString()) + .build(); + String post = HttpUtils.post(url, body); + return JSONObject.parseObject(post, ResultDTO.class); + } +} diff --git a/oh-my-scheduler-client/src/test/java/TestClient.java b/oh-my-scheduler-client/src/test/java/TestClient.java new file mode 100644 index 00000000..386191df --- /dev/null +++ b/oh-my-scheduler-client/src/test/java/TestClient.java @@ -0,0 +1,30 @@ +import com.github.kfcfans.oms.client.OhMyClient; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * 测试 Client + * + * @author tjq + * @since 2020/4/15 + */ +public class TestClient { + + private static OhMyClient ohMyClient; + + @BeforeAll + public static void initClient() throws Exception { + ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test"); + } + + @Test + public void testInstanceOpenAPI() throws Exception { + System.out.println(ohMyClient.stopInstance(1586855173043L)); + System.out.println(ohMyClient.fetchInstanceStatus(1586855173043L)); + } + + @Test + public void testJobOpenAPI() throws Exception { + System.out.println(ohMyClient.runJob(1L, "hhhh")); + } +} diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml index 32252222..b80f7089 100644 --- a/oh-my-scheduler-common/pom.xml +++ b/oh-my-scheduler-common/pom.xml @@ -17,6 +17,7 @@ 1.7.30 3.10 28.2-jre + 4.4.1 @@ -40,6 +41,13 @@ guava ${guava.version} + + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + \ No newline at end of file diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OpenAPIConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OpenAPIConstant.java new file mode 100644 index 00000000..417175dc --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OpenAPIConstant.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.common; + +/** + * OpenAPI 常量 + * + * @author tjq + * @since 2020/4/15 + */ +public class OpenAPIConstant { + + public static final String WEB_PATH = "/openApi"; + public static final String ASSERT = "/assert"; + public static final String SAVE_JOB = "/saveJob"; + public static final String DELETE_JOB = "/deleteJob"; + public static final String DISABLE_JOB = "/disableJob"; + public static final String RUN_JOB = "/runJob"; + public static final String STOP_INSTANCE = "/stopInstance"; + public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus"; + +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java index 6b0bffe5..e6b5c200 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java @@ -2,6 +2,7 @@ package com.github.kfcfans.common.response; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import org.apache.commons.lang3.exception.ExceptionUtils; /** @@ -12,10 +13,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils; */ @Getter @Setter +@ToString public class ResultDTO { private boolean success; + // 数据(success为 true 时存在) private T data; + // 错误信息(success为 false 时存在) private String message; public static ResultDTO success(T data) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/HttpUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/HttpUtils.java similarity index 64% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/HttpUtils.java rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/HttpUtils.java index f92e233a..355fe3e2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/HttpUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/HttpUtils.java @@ -1,7 +1,8 @@ -package com.github.kfcfans.oms.worker.common.utils; +package com.github.kfcfans.common.utils; import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.RequestBody; import okhttp3.Response; import java.io.IOException; @@ -39,4 +40,17 @@ public class HttpUtils { return null; } + public static String post(String url, RequestBody requestBody) throws IOException { + Request request = new Request.Builder() + .post(requestBody) + .url(url) + .build(); + try (Response response = client.newCall(request).execute()) { + if (response.code() == HTTP_SUCCESS_CODE) { + return Objects.requireNonNull(response.body()).string(); + } + } + return null; + } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java index 0648633e..1dd972a1 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java @@ -14,7 +14,7 @@ import lombok.Getter; public enum JobStatus { ENABLE(1), - STOPPED(2), + DISABLE(2), DELETED(99); private int v; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java index c4de915a..f6e09107 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.server.service; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -23,14 +25,25 @@ public class CacheService { @Resource private JobInfoRepository jobInfoRepository; + @Resource + private InstanceLogRepository instanceLogRepository; private final Cache jobId2JobNameCache; + private final Cache instanceId2AppId; + private final Cache jobId2AppId; public CacheService() { jobId2JobNameCache = CacheBuilder.newBuilder() .expireAfterWrite(Duration.ofHours(1)) .maximumSize(1024) .build(); + + instanceId2AppId = CacheBuilder.newBuilder() + .maximumSize(4096) + .build(); + jobId2AppId = CacheBuilder.newBuilder() + .maximumSize(4096) + .build(); } /** @@ -40,11 +53,48 @@ public class CacheService { try { return jobId2JobNameCache.get(jobId, () -> { Optional jobInfoDOOptional = jobInfoRepository.findById(jobId); - // 防止缓存穿透 hhh + // 防止缓存穿透 hhh(但是一开始没有,后来创建的情况下会有问题,不过问题不大,这里就不管了) return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse(""); }); }catch (Exception e) { - log.error("[CacheService] getJobName for {} failed.", jobId, e); + log.error("[CacheService] getAppIdByInstanceId for {} failed.", jobId, e); + } + return null; + } + + public Long getAppIdByInstanceId(Long instanceId) { + + try { + return instanceId2AppId.get(instanceId, () -> { + // 内部记录数据库异常 + try { + InstanceLogDO instanceLog = instanceLogRepository.findByInstanceId(instanceId); + if (instanceLog != null) { + return instanceLog.getAppId(); + } + }catch (Exception e) { + log.error("[CacheService] getAppId for instanceId:{} failed.", instanceId, e); + } + return null; + }); + }catch (Exception ignore) { + // 忽略缓存 load 失败的异常 + } + return null; + } + + public Long getAppIdByJobId(Long jobId) { + try { + return jobId2AppId.get(jobId, () -> { + try { + Optional jobInfoDOOptional = jobInfoRepository.findById(jobId); + return jobInfoDOOptional.map(JobInfoDO::getAppId).orElse(null); + }catch (Exception e) { + log.error("[CacheService] getAppId for job:{} failed.", jobId, e); + } + return null; + }); + } catch (Exception ignore) { } return null; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java new file mode 100644 index 00000000..d2b807e7 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java @@ -0,0 +1,123 @@ +package com.github.kfcfans.oms.server.service; + +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.service.id.IdGenerateService; +import com.github.kfcfans.oms.server.service.instance.InstanceService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; +import java.util.Optional; + +/** + * 任务服务 + * + * @author tjq + * @since 2020/4/15 + */ +@Slf4j +@Service +public class JobService { + + @Resource + private InstanceService instanceService; + + @Resource + private DispatchService dispatchService; + @Resource + private IdGenerateService idGenerateService; + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private InstanceLogRepository instanceLogRepository; + + /** + * 手动立即运行某个任务 + * @param jobId 任务ID + * @param instanceParams 任务实例参数 + * @return 任务实例ID + */ + public long runJob(Long jobId, String instanceParams) { + Optional jobInfoOPT = jobInfoRepository.findById(jobId); + if (!jobInfoOPT.isPresent()) { + throw new IllegalArgumentException("can't find job by jobId:" + jobId); + } + JobInfoDO jobInfo = jobInfoOPT.get(); + long instanceId = idGenerateService.allocate(); + + InstanceLogDO executeLog = new InstanceLogDO(); + executeLog.setJobId(jobId); + executeLog.setAppId(jobInfo.getAppId()); + executeLog.setInstanceId(instanceId); + executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + executeLog.setExpectedTriggerTime(System.currentTimeMillis()); + executeLog.setGmtCreate(new Date()); + executeLog.setGmtModified(executeLog.getGmtCreate()); + + instanceLogRepository.saveAndFlush(executeLog); + dispatchService.dispatch(jobInfo, executeLog.getInstanceId(), 0, instanceParams); + return instanceId; + } + + /** + * 删除某个任务 + * @param jobId 任务ID + */ + public void deleteJob(Long jobId) { + shutdownOrStopJob(jobId, JobStatus.DELETED); + } + + /** + * 禁用某个任务 + */ + public void disableJob(Long jobId) { + shutdownOrStopJob(jobId, JobStatus.DISABLE); + } + + /** + * 停止或删除某个JOB + * 秒级任务还要额外停止正在运行的任务实例 + */ + private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException { + + // 1. 先更新 job_info 表 + Optional jobInfoOPT = jobInfoRepository.findById(jobId); + if (!jobInfoOPT.isPresent()) { + throw new IllegalArgumentException("can't find job by jobId:" + jobId); + } + JobInfoDO jobInfoDO = jobInfoOPT.get(); + jobInfoDO.setStatus(status.getV()); + jobInfoRepository.saveAndFlush(jobInfoDO); + + // 2. 关闭秒级任务 + TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); + if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) { + return; + } + List executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); + if (CollectionUtils.isEmpty(executeLogs)) { + return; + } + if (executeLogs.size() > 1) { + log.warn("[JobController] frequent job should just have one running instance, there must have some bug."); + } + executeLogs.forEach(instance -> { + try { + + // 重复查询了数据库,不过问题不大,这个调用量很小 + instanceService.stopInstance(instance.getInstanceId()); + }catch (Exception ignore) { + } + }); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java index eb1b0326..412668d0 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java @@ -7,12 +7,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.Resource; - /** * 唯一ID生成服务,使用 Twitter snowflake 算法 - * 机房ID:固定为0,占用三位(8个机房怎么样也够了吧) - * 机器ID:数据库自增,占用7位(最多支持128台机器) + * 机房ID:固定为0,占用2位 + * 机器ID:数据库自增,占用8位(最多支持256台机器,如果频繁部署需要删除数据库重置id) * * @author tjq * @since 2020/4/6 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java index 7b4cb6c1..acbc87c8 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java @@ -17,8 +17,8 @@ class SnowFlakeIdGenerator { * 每一部分占用的位数 */ private final static long SEQUENCE_BIT = 12; //序列号占用的位数 - private final static long MACHINE_BIT = 7; //机器标识占用的位数 - private final static long DATA_CENTER_BIT = 3;//数据中心占用的位数 + private final static long MACHINE_BIT = 8; //机器标识占用的位数 + private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数 /** * 每一部分的最大值 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index 7a042e95..21615b2e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -14,7 +14,6 @@ import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; -import org.springframework.data.domain.*; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -69,6 +68,20 @@ public class InstanceService { taskTrackerActor.tell(req, null); } + /** + * 获取任务实例的壮体啊 + * @param instanceId 任务实例ID + * @return 任务实例的状态 + */ + public InstanceStatus getInstanceStatus(Long instanceId) { + InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId); + if (instanceLogDO == null) { + log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId); + throw new IllegalArgumentException("invalid instanceId: " + instanceId); + } + return InstanceStatus.of(instanceLogDO.getStatus()); + } + /** * 获取任务实例的详细运行详细 * @param instanceId 任务实例ID diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index d4036785..6472a149 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -1,19 +1,16 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.common.ExecuteType; -import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.PageResult; -import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; -import com.github.kfcfans.oms.server.service.DispatchService; -import com.github.kfcfans.oms.server.service.id.IdGenerateService; +import com.github.kfcfans.oms.server.service.JobService; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; import com.github.kfcfans.oms.server.web.request.QueryJobInfoRequest; @@ -24,7 +21,6 @@ import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; -import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; @@ -46,9 +42,7 @@ import java.util.stream.Collectors; public class JobController { @Resource - private DispatchService dispatchService; - @Resource - private IdGenerateService idGenerateService; + private JobService jobService; @Resource private InstanceService instanceService; @@ -68,7 +62,7 @@ public class JobController { jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV()); jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV()); jobInfoDO.setTimeExpressionType(timeExpressionType.getV()); - jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.STOPPED.getV()); + jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.DISABLE.getV()); if (jobInfoDO.getMaxWorkerCount() == null) { jobInfoDO.setMaxInstanceNum(0); @@ -90,31 +84,27 @@ public class JobController { // 秒级任务直接调度执行 if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) { - runJobImmediately(jobInfoDO); + jobService.runJob(jobInfoDO.getId(), null); } return ResultDTO.success(null); } - @GetMapping("/stop") - public ResultDTO stopJob(Long jobId) throws Exception { - shutdownOrStopJob(jobId, JobStatus.STOPPED); + @GetMapping("/disable") + public ResultDTO disableJob(Long jobId) throws Exception { + jobService.disableJob(jobId); return ResultDTO.success(null); } @GetMapping("/delete") public ResultDTO deleteJob(Long jobId) throws Exception { - shutdownOrStopJob(jobId, JobStatus.DELETED); + jobService.deleteJob(jobId); return ResultDTO.success(null); } @GetMapping("/run") public ResultDTO runImmediately(Long jobId) { - Optional jobInfoOPT = jobInfoRepository.findById(jobId); - if (!jobInfoOPT.isPresent()) { - throw new IllegalArgumentException("can't find job by jobId:" + jobId); - } - runJobImmediately(jobInfoOPT.get()); + jobService.runJob(jobId, null); return ResultDTO.success(null); } @@ -158,59 +148,8 @@ public class JobController { return ResultDTO.success(convertPage(jobInfoPage)); } - /** - * 立即运行JOB - */ - private void runJobImmediately(JobInfoDO jobInfoDO) { - InstanceLogDO executeLog = new InstanceLogDO(); - executeLog.setJobId(jobInfoDO.getId()); - executeLog.setAppId(jobInfoDO.getAppId()); - executeLog.setInstanceId(idGenerateService.allocate()); - executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); - executeLog.setExpectedTriggerTime(System.currentTimeMillis()); - executeLog.setGmtCreate(new Date()); - executeLog.setGmtModified(executeLog.getGmtCreate()); - instanceLogRepository.saveAndFlush(executeLog); - dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0); - } - /** - * 停止或删除某个JOB - * 秒级任务还要额外停止正在运行的任务实例 - */ - private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException { - - // 1. 先更新 job_info 表 - Optional jobInfoOPT = jobInfoRepository.findById(jobId); - if (!jobInfoOPT.isPresent()) { - throw new IllegalArgumentException("can't find job by jobId:" + jobId); - } - JobInfoDO jobInfoDO = jobInfoOPT.get(); - jobInfoDO.setStatus(status.getV()); - jobInfoRepository.saveAndFlush(jobInfoDO); - - // 2. 关闭秒级任务 - TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); - if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) { - return; - } - List executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); - if (CollectionUtils.isEmpty(executeLogs)) { - return; - } - if (executeLogs.size() > 1) { - log.warn("[JobController] frequent job has multi instance, there must ha"); - } - executeLogs.forEach(instance -> { - try { - - // 重复查询了数据库,不过问题不大,这个调用量很小 - instanceService.stopInstance(instance.getInstanceId()); - }catch (Exception ignore) { - } - }); - } private static PageResult convertPage(Page jobInfoPage) { List jobInfoVOList = jobInfoPage.getContent().stream().map(JobController::convert).collect(Collectors.toList()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java new file mode 100644 index 00000000..8af2d420 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java @@ -0,0 +1,99 @@ +package com.github.kfcfans.oms.server.web.controller; + +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.OpenAPIConstant; +import com.github.kfcfans.common.response.ResultDTO; +import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; +import com.github.kfcfans.oms.server.service.CacheService; +import com.github.kfcfans.oms.server.service.JobService; +import com.github.kfcfans.oms.server.service.instance.InstanceService; +import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; + +/** + * 开发接口控制器,对接 oms-client + * + * @author tjq + * @since 2020/4/15 + */ +@RestController +@RequestMapping(OpenAPIConstant.WEB_PATH) +public class OpenAPIController { + + @Resource + private JobService jobService; + @Resource + private InstanceService instanceService; + + @Resource + private CacheService cacheService; + + @Resource + private AppInfoRepository appInfoRepository; + + @GetMapping(OpenAPIConstant.ASSERT) + public ResultDTO assertAppName(String appName) { + AppInfoDO appInfo = appInfoRepository.findByAppName(appName); + if (appInfo == null) { + return ResultDTO.failed(appName + " is not registered!"); + } + return ResultDTO.success(appInfo.getId()); + } + + /* ************* Job 区 ************* */ + @PostMapping(OpenAPIConstant.SAVE_JOB) + public ResultDTO newJob(ModifyJobInfoRequest request) { + return null; + } + + @GetMapping(OpenAPIConstant.DELETE_JOB) + public ResultDTO deleteJob(Long jobId, Long appId) { + checkJobIdValid(jobId, appId); + jobService.deleteJob(jobId); + return ResultDTO.success(null); + } + @PostMapping(OpenAPIConstant.DISABLE_JOB) + public ResultDTO disableJob(Long jobId, Long appId) { + checkJobIdValid(jobId, appId); + jobService.disableJob(jobId); + return ResultDTO.success(null); + } + @PostMapping(OpenAPIConstant.RUN_JOB) + public ResultDTO runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams) { + checkJobIdValid(jobId, appId); + return ResultDTO.success(jobService.runJob(jobId, instanceParams)); + } + + /* ************* Instance 区 ************* */ + + @PostMapping(OpenAPIConstant.STOP_INSTANCE) + public ResultDTO stopInstance(Long instanceId, Long appId) { + checkInstanceIdValid(instanceId, appId); + instanceService.stopInstance(instanceId); + return ResultDTO.success(null); + } + + @PostMapping(OpenAPIConstant.FETCH_INSTANCE_STATUS) + public ResultDTO fetchInstanceStatus(Long instanceId) { + InstanceStatus instanceStatus = instanceService.getInstanceStatus(instanceId); + return ResultDTO.success(instanceStatus.getV()); + } + + private void checkInstanceIdValid(Long instanceId, Long appId) { + Long realAppId = cacheService.getAppIdByInstanceId(instanceId); + if (appId.equals(realAppId)) { + return; + } + throw new IllegalArgumentException("instance is not belong to the app whose appId is " + appId); + } + private void checkJobIdValid(Long jobId, Long appId) { + Long realAppId = cacheService.getAppIdByJobId(jobId); + if (appId.equals(realAppId)) { + return; + } + throw new IllegalArgumentException("job is not belong to the app whose appId is " + appId); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 2e39804a..0c3464dd 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -12,7 +12,7 @@ import com.github.kfcfans.oms.worker.background.WorkerHealthReporter; import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.utils.NetUtils; -import com.github.kfcfans.oms.worker.common.utils.HttpUtils; +import com.github.kfcfans.common.utils.HttpUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.google.common.base.Stopwatch; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java index 390cdd06..5bbc1a8c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java @@ -4,8 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; -import com.github.kfcfans.oms.worker.common.utils.HttpUtils; -import com.github.kfcfans.oms.worker.core.tracker.task.FrequentTaskTracker; +import com.github.kfcfans.common.utils.HttpUtils; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.google.common.collect.Maps; @@ -15,7 +14,6 @@ import org.springframework.util.StringUtils; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * 服务发现 diff --git a/pom.xml b/pom.xml index 1cf11b5a..80a5ffac 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,7 @@ oh-my-scheduler-worker oh-my-scheduler-server oh-my-scheduler-common + oh-my-scheduler-client pom