diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java index 4909b8a9..1dc2a169 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java @@ -33,6 +33,8 @@ public class InstanceDetail implements OmsSerializable { // 秒级任务专用 private List subInstanceDetails; + // 重试次数 + private Long runningTimes; // 秒级任务的 extra -> List @Data diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java index b6662bf0..05982f72 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java @@ -81,11 +81,9 @@ public class JobInfoDO { // 最大机器数量 private Integer maxWorkerCount; + // 报警用户ID列表,多值逗号分隔 + private String notifyUserIds; + private Date gmtCreate; private Date gmtModified; - - // 针对只查询 id 的情况 - public JobInfoDO(Long id) { - this.id = id; - } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java index a0257e6e..c98b1aa3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java @@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository; import com.github.kfcfans.oms.server.persistence.model.UserInfoDO; import org.springframework.data.jpa.repository.JpaRepository; +import java.util.List; + /** * 用户信息表数据库访问层 * @@ -10,4 +12,7 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/4/12 */ public interface UserInfoRepository extends JpaRepository { + + List findByUsernameLike(String username); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmService.java new file mode 100644 index 00000000..6242ea29 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmService.java @@ -0,0 +1,35 @@ +package com.github.kfcfans.oms.server.service.alarm; + +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; + +/** + * 报警服务 + * + * @author tjq + * @since 2020/4/19 + */ +@Slf4j +public class AlarmService { + + private static List alarmableList = Lists.newLinkedList(); + + public static void alarm(JobInfoDO jobInfo, InstanceLogDO instanceLog) { + if (CollectionUtils.isEmpty(alarmableList)) { + return; + } + alarmableList.forEach(alarmable -> { + try { + alarmable.alarm(jobInfo, instanceLog); + }catch (Exception e) { + log.warn("[AlarmService] alarm failed.", e); + } + }); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/Alarmable.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/Alarmable.java new file mode 100644 index 00000000..12aa2bbb --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/Alarmable.java @@ -0,0 +1,15 @@ +package com.github.kfcfans.oms.server.service.alarm; + +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; + +/** + * 报警接口 + * + * @author tjq + * @since 2020/4/19 + */ +public interface Alarmable { + + void alarm(JobInfoDO jobInfo, InstanceLogDO instanceLog); +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java index 412668d0..2615dbec 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/IdGenerateService.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Service; /** * 唯一ID生成服务,使用 Twitter snowflake 算法 * 机房ID:固定为0,占用2位 - * 机器ID:数据库自增,占用8位(最多支持256台机器,如果频繁部署需要删除数据库重置id) + * 机器ID:数据库自增,占用14位(如果频繁部署需要删除数据库重置id) * * @author tjq * @since 2020/4/6 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java index acbc87c8..0a3aaa8a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/id/SnowFlakeIdGenerator.java @@ -16,8 +16,8 @@ class SnowFlakeIdGenerator { /** * 每一部分占用的位数 */ - private final static long SEQUENCE_BIT = 12; //序列号占用的位数 - private final static long MACHINE_BIT = 8; //机器标识占用的位数 + private final static long SEQUENCE_BIT = 6; //序列号占用的位数 + private final static long MACHINE_BIT = 14; //机器标识占用的位数 private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数 /** diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 3db0f58d..efbcd6d3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -9,6 +9,7 @@ import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.service.DispatchService; +import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -16,6 +17,7 @@ import org.springframework.beans.BeanUtils; import java.util.Date; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * 管理被调度的服务 @@ -111,7 +113,15 @@ public class InstanceManager { if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes()); - getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); + + // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败) + HashedWheelTimerHolder.TIMER.schedule(() -> { + getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); + }, 10, TimeUnit.SECONDS); + + // 修改状态为 等待派发,正式开始重试 + // 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖) + updateEntity.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); }else { updateEntity.setResult(req.getResult()); updateEntity.setFinishedTime(System.currentTimeMillis()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index e21e91b2..86f2530f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -128,7 +128,9 @@ public class InstanceService { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (askResponse.isSuccess()) { - return askResponse.getData(InstanceDetail.class); + InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class); + instanceDetail.setRunningTimes(instanceLogDO.getRunningTimes()); + return instanceDetail; }else { log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getMessage()); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index 4b5fe231..cb602ba2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -35,7 +35,7 @@ import java.util.stream.Collectors; public class InstanceStatusCheckService { private static final int MAX_BATCH_NUM = 10; - private static final long DISPATCH_TIMEOUT_MS = 10000; + private static final long DISPATCH_TIMEOUT_MS = 30000; private static final long RECEIVE_TIMEOUT_MS = 60000; private static final long RUNNING_TIMEOUT_MS = 60000; @@ -77,6 +77,13 @@ public class InstanceStatusCheckService { if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances); waitingDispatchInstances.forEach(instance -> { + + // 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例 + long t = System.currentTimeMillis() - instance.getGmtModified().getTime(); + if (t < DISPATCH_TIMEOUT_MS) { + return; + } + // 重新派发(orElseGet用于消除编译器警告...) JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index 64341343..b2eae4e1 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -13,12 +13,15 @@ import com.github.kfcfans.oms.server.service.JobService; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; import com.github.kfcfans.oms.server.web.request.QueryJobInfoRequest; import com.github.kfcfans.oms.server.web.response.JobInfoVO; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; @@ -44,6 +47,9 @@ public class JobController { @Resource private JobInfoRepository jobInfoRepository; + private static final Splitter commaSplitter = Splitter.on(","); + private static final Joiner commaJoiner = Joiner.on(",").skipNulls(); + @PostMapping("/save") public ResultDTO saveJobInfo(@RequestBody ModifyJobInfoRequest request) throws Exception { @@ -61,6 +67,11 @@ public class JobController { jobInfoDO.setMaxInstanceNum(0); } + // 转化报警用户列表 + if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { + jobInfoDO.setNotifyUserIds(commaJoiner.join(request.getNotifyUserIds())); + } + // 计算下次调度时间 Date now = new Date(); if (timeExpressionType == TimeExpressionType.CRON) { @@ -159,6 +170,12 @@ public class JobController { jobInfoVO.setProcessorType(processorType.name()); jobInfoVO.setEnable(jobInfoDO.getStatus() == JobStatus.ENABLE.getV()); + if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { + jobInfoVO.setNotifyUserIds(commaSplitter.splitToList(jobInfoDO.getNotifyUserIds())); + }else { + jobInfoVO.setNotifyUserIds(Lists.newLinkedList()); + } + return jobInfoVO; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java index 19f687ee..52d1a8fd 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java @@ -4,10 +4,13 @@ import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.server.persistence.model.UserInfoDO; import com.github.kfcfans.oms.server.persistence.repository.UserInfoRepository; import com.github.kfcfans.oms.server.web.request.ModifyUserInfoRequest; +import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.springframework.beans.BeanUtils; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -39,11 +42,23 @@ public class UserInfoController { } @GetMapping("list") - public ResultDTO> list() { - List result = userInfoRepository.findAll().stream().map(x -> new UserItemVO(x.getId(), x.getUsername())).collect(Collectors.toList()); - return ResultDTO.success(result); + public ResultDTO> list(@RequestParam(required = false) String name) { + + List result; + if (StringUtils.isEmpty(name)) { + result = userInfoRepository.findAll(); + }else { + result = userInfoRepository.findByUsernameLike("%" + name + "%"); + } + return ResultDTO.success(convert(result)); } + private static List convert(List data) { + if (CollectionUtils.isEmpty(data)) { + return Lists.newLinkedList(); + } + return data.stream().map(x -> new UserItemVO(x.getId(), x.getUsername())).collect(Collectors.toList()); + } @Getter @NoArgsConstructor diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java index 2ba5174b..8412f0a2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.request; import lombok.Data; import java.util.Date; +import java.util.List; /** * 创建/修改 JobInfo 请求 @@ -74,4 +75,7 @@ public class ModifyJobInfoRequest { private String designatedWorkers; // 最大机器数量 private Integer maxWorkerCount; + + // 报警用户ID列表 + private List notifyUserIds; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java index b398070d..91875b1b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.response; import lombok.Data; import java.util.Date; +import java.util.List; /** * JobInfo 对外展示对象 @@ -72,4 +73,7 @@ public class JobInfoVO { private String designatedWorkers; // 最大机器数量 private Integer maxWorkerCount; + + // 报警用户ID列表 + private List notifyUserIds; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 974d3b27..f02801a6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -75,7 +75,11 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di try { // 校验 appName - appId = assertAppName(); + if (!config.isEnableTestMode()) { + appId = assertAppName(); + }else { + log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env."); + } // 初始化 ActorSystem Map overrideConfig = Maps.newHashMap(); @@ -99,7 +103,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di // 服务发现 currentServer = ServerDiscoveryService.discovery(); - if (StringUtils.isEmpty(currentServer)) { + if (StringUtils.isEmpty(currentServer) && !config.isEnableTestMode()) { throw new RuntimeException("can't find any available server, this worker has been quarantined."); } log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java index 6d2fe306..ffab54ec 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java @@ -30,4 +30,9 @@ public class OhMyConfig { */ private int maxResultLength = 8096; + /** + * 启动测试模式,true情况下,不再尝试连接 server 并验证appName + * true -> 用于本地写单元测试调试; false -> 默认值,标准模式 + */ + private boolean enableTestMode = false; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 96976c45..f85fe675 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -264,6 +264,7 @@ public abstract class TaskTracker { */ public void destroy() { + Stopwatch sw = Stopwatch.createStarted(); // 0. 开始关闭线程池,不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。 scheduledPool.shutdown(); @@ -290,7 +291,7 @@ public abstract class TaskTracker { // 3. 移除顶层引用,送去 GC TaskTrackerPool.remove(instanceId); - log.info("[TaskTracker-{}] TaskTracker has left the world, bye~", instanceId); + log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", instanceId, sw.stop()); // 4. 强制关闭线程池 if (!scheduledPool.isTerminated()) { diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java index 72929dc6..874b1dd4 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java @@ -30,6 +30,8 @@ public class CommonTaskTrackerTest { OhMyConfig ohMyConfig = new OhMyConfig(); ohMyConfig.setAppName("oms-test"); ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700")); + ohMyConfig.setEnableTestMode(true); + OhMyWorker worker = new OhMyWorker(); worker.setConfig(ohMyConfig); worker.init(); diff --git a/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf b/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf index c258dfbe..af019eec 100644 --- a/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf +++ b/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf @@ -2,7 +2,11 @@ akka { actor { # for test provider = remote - allow-java-serialization = on + allow-java-serialization = off + + serialization-bindings { + "com.github.kfcfans.common.OmsSerializable" = jackson-cbor + } } remote { artery {