diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java index 8151c0d6..0ce70df7 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java @@ -1,7 +1,10 @@ package com.github.kfcfans.common.model; +import com.sun.istack.internal.NotNull; import lombok.Data; +import java.io.Serializable; + /** * 系统指标 * @@ -9,7 +12,7 @@ import lombok.Data; * @since 2020/3/25 */ @Data -public class SystemMetrics { +public class SystemMetrics implements Serializable, Comparable { // CPU核心数量 private int cpuProcessors; @@ -26,4 +29,36 @@ public class SystemMetrics { private double diskTotal; private double diskUsage; + // 缓存分数 + private int score; + + @Override + public int compareTo(SystemMetrics that) { + return this.calculateScore() - that.calculateScore(); + } + + /** + * 计算得分情况,内存 > CPU > 磁盘 + * 磁盘必须有1G以上的剩余空间 + */ + public int calculateScore() { + + if (score > 0) { + return score; + } + + double availableCPUCores = cpuProcessors * cpuLoad; + double availableMemory = jvmMaxMemory - jvmUsedMemory; + double availableDisk = diskTotal - diskUsage; + + // 最低运行标准,1G磁盘 & 0.5G内存 & 一个可用的CPU核心 + if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 1) { + score = 1; + } else { + // 磁盘只需要满足最低标准即可 + score = (int) (availableMemory * 2 + availableCPUCores); + } + + return score; + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHeartbeat.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHeartbeat.java index 1dc69bd7..9647bc8e 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHeartbeat.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHeartbeat.java @@ -18,6 +18,10 @@ public class WorkerHeartbeat implements Serializable { private String workerAddress; // 当前 appName private String appName; + // 当前 appId + private Long appId; + // 当前时间 + private long heartbeatTime; private SystemMetrics systemMetrics; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java index efe6bab9..731f5d13 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server; +import com.github.kfcfans.oms.server.core.akka.OhMyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -13,6 +14,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class OhMyApplication { public static void main(String[] args) { + + // 先启动 ActorSystem + OhMyServer.init(); + + // 再启动SpringBoot SpringApplication.run(OhMyApplication.class, args); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java index 5ab545c3..8eac5e96 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java @@ -1,6 +1,5 @@ package com.github.kfcfans.oms.server.core.akka; -import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; @@ -27,7 +26,7 @@ public class OhMyServer { @Getter private static String actorSystemAddress; - public void init() { + public static void init() { // 1. 启动 ActorSystem Map overrideConfig = Maps.newHashMap(); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java index 14528dce..46c7f3a0 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.core.akka; import akka.actor.AbstractActor; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; +import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import lombok.extern.slf4j.Slf4j; /** @@ -35,5 +36,6 @@ public class ServerActor extends AbstractActor { } private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) { + WorkerManagerService.updateStatus(heartbeat); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java index b4894417..08065daa 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java @@ -54,6 +54,12 @@ public class JobInfoDO { // 任务的每一个Task超时时间 private Long taskTimeLimit; + // 1 正常运行,2 停止(不再调度) + private Integer status; + // 下一次调度时间 + private Long nextTriggerTime; + + private Date gmtCreate; private Date gmtModified; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java index 327d457b..1d011e58 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java @@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import org.springframework.data.jpa.repository.JpaRepository; +import java.util.List; + /** * JobInfo 数据访问层 * @@ -10,4 +12,9 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/4/1 */ public interface JobInfoRepository extends JpaRepository { + + + List findByAppIdInAndNextTriggerTimeLessThanEqual(List appIds, Long time); + + List findByAppIdAndNextTriggerTimeLessThanEqual(Long appId, Long time); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java index bdc1a0d1..f0dd2cdc 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java @@ -10,4 +10,7 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/4/1 */ public interface JobLogRepository extends JpaRepository { + + long countByJobIdAndStatus(Long jobId, Integer status); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java index d2c9a49b..c3c9e6c7 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java @@ -6,6 +6,7 @@ import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import javax.transaction.Transactional; +import java.util.List; /** * 利用唯一性约束作为数据库锁 @@ -20,5 +21,10 @@ public interface OmsLockRepository extends JpaRepository { @Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true) int deleteByLockName(String lockName); + @Modifying + @Transactional + @Query(value = "delete from oms_lock where lock_name in ?1", nativeQuery = true) + int deleteByLockNames(List lockNames); + OmsLockDO findByLockName(String lockName); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java new file mode 100644 index 00000000..ebc3345d --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -0,0 +1,27 @@ +package com.github.kfcfans.oms.server.service; + +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * 派送服务 + * + * @author tjq + * @since 2020/4/5 + */ +@Service +public class DispatchService { + + @Resource + private JobLogRepository jobLogRepository; + + public void dispatch(JobInfoDO jobInfo) { + + // 1. 查询当前运行的实例数 + + + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java new file mode 100644 index 00000000..88680757 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java @@ -0,0 +1,74 @@ +package com.github.kfcfans.oms.server.service.ha; + +import com.github.kfcfans.common.model.SystemMetrics; +import com.github.kfcfans.common.request.WorkerHeartbeat; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; + +/** + * 管理Worker集群状态 + * + * @author tjq + * @since 2020/4/5 + */ +@Slf4j +public class ClusterStatusHolder { + + // 集群所属的应用名称 + private String appName; + // 集群中所有机器的健康状态 + private Map address2Metrics; + // 集群中所有机器的最后心跳时间 + private Map address2ActiveTime; + + private static final long WORKER_TIMEOUT_MS = 30000; + + public ClusterStatusHolder(String appName) { + this.appName = appName; + address2Metrics = Maps.newConcurrentMap(); + address2ActiveTime = Maps.newConcurrentMap(); + } + + /** + * 更新 worker 机器的状态 + */ + public void updateStatus(WorkerHeartbeat heartbeat) { + + String workerAddress = heartbeat.getWorkerAddress(); + long heartbeatTime = heartbeat.getHeartbeatTime(); + + address2Metrics.put(workerAddress, heartbeat.getSystemMetrics()); + Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L); + if (heartbeatTime > oldTime) { + address2ActiveTime.put(workerAddress, heartbeatTime); + } + } + + /** + * 选取状态最好的Worker进行任务派发 + * @return Worker的地址(null代表没有可用的Worker) + */ + public String chooseBestWorker() { + + // 直接对 HashMap 根据Value进行排序 + List> entryList = Lists.newArrayList(address2Metrics.entrySet()); + + // 降序排序(Comparator.comparingInt默认为升序,弃用) + entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore()); + + for (Map.Entry entry : address2Metrics.entrySet()) { + long lastActiveTime = address2ActiveTime.getOrDefault(entry.getKey(), -1L); + long timeout = System.currentTimeMillis() - lastActiveTime; + if (timeout < WORKER_TIMEOUT_MS) { + return entry.getKey(); + } + } + + log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics); + return null; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index b3c7c634..e02fdcbb 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -15,6 +15,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.Duration; import java.util.Date; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -35,31 +36,31 @@ public class ServerSelectService { private static final int RETRY_TIMES = 10; private static final long PING_TIMEOUT_MS = 5000; - private static final long WAIT_LOCK_TIME = 1000; - private static final String SERVER_ELECT_LOCK = "server_elect_%s"; + private static final String SERVER_ELECT_LOCK = "server_elect_%d"; /** * 获取某个应用对应的Server * 缺点:如果server死而复生,可能造成worker集群脑裂,不过感觉影响不是很大 & 概率极低,就不管了 - * @param appName 应用名称 + * @param appId 应用ID * @return 当前可用的Server */ - public String getServer(String appName) { + public String getServer(Long appId) { for (int i = 0; i < RETRY_TIMES; i++) { // 无锁获取当前数据库中的Server - AppInfoDO app = appInfoRepository.findByAppName(appName); - if (app == null) { - throw new RuntimeException(appName + " is not registered!"); + Optional appInfoOpt = appInfoRepository.findById(appId); + if (!appInfoOpt.isPresent()) { + throw new RuntimeException(appId + " is not registered!"); } - String originServer = app.getCurrentServer(); + String appName = appInfoOpt.get().getAppName(); + String originServer = appInfoOpt.get().getCurrentServer(); if (isActive(originServer)) { return originServer; } - // 获取失败,重新进行Server选举,需要加锁 - String lockName = String.format(SERVER_ELECT_LOCK, appName); + // 无可用Server,重新进行Server选举,需要加锁 + String lockName = String.format(SERVER_ELECT_LOCK, appId); boolean lockStatus = lockService.lock(lockName); if (!lockStatus) { try { @@ -71,7 +72,10 @@ public class ServerSelectService { try { // 可能上一台机器已经完成了Server选举,需要再次判断 - AppInfoDO appInfo = appInfoRepository.findByAppName(appName); + AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> { + log.error("[ServerSelectService] impossible, unless we just lost our database."); + return null; + }); if (isActive(appInfo.getCurrentServer())) { return appInfo.getCurrentServer(); } @@ -88,7 +92,7 @@ public class ServerSelectService { lockService.unlock(lockName); } } - throw new RuntimeException("server elect failed for app " + appName); + throw new RuntimeException("server elect failed for app " + appId); } private boolean isActive(String serverAddress) { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java new file mode 100644 index 00000000..963ad96b --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java @@ -0,0 +1,55 @@ +package com.github.kfcfans.oms.server.service.ha; + +import com.github.kfcfans.common.request.WorkerHeartbeat; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; + +/** + * Worker 管理服务 + * + * @author tjq + * @since 2020/4/5 + */ +@Slf4j +public class WorkerManagerService { + + private static final Map appName2ClusterStatus = Maps.newConcurrentMap(); + + /** + * 更新状态 + * @param heartbeat Worker的心跳包 + */ + public static void updateStatus(WorkerHeartbeat heartbeat) { + Long appId = heartbeat.getAppId(); + String appName = heartbeat.getAppName(); + ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); + clusterStatusHolder.updateStatus(heartbeat); + } + + /** + * 选择状态最好的Worker执行任务 + * @param appId 应用ID + * @return Worker的地址(null代表没有可用的Worker) + */ + public static String chooseBestWorker(Long appId) { + ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.get(appId); + if (clusterStatusHolder == null) { + log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId); + return null; + } + return clusterStatusHolder.chooseBestWorker(); + } + + /** + * 获取当前该 Server 管理的所有应用ID + * @return List + */ + public static List listAppIds() { + return Lists.newArrayList(appName2ClusterStatus.keySet()); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java index 2049864d..d9150238 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java @@ -4,12 +4,14 @@ import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.oms.server.persistence.model.OmsLockDO; import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -70,4 +72,28 @@ public class DatabaseLockService implements LockService { log.error("[DatabaseLockService] unlock {} failed.", name, e); } } + + @Override + public boolean batchLock(List names) { + List locks = Lists.newLinkedList(); + names.forEach(name -> locks.add(new OmsLockDO(name, NetUtils.getLocalHost()))); + try { + omsLockRepository.saveAll(locks); + omsLockRepository.flush(); + return true; + }catch (DataIntegrityViolationException ignore) { + }catch (Exception e) { + log.warn("[DatabaseLockService] write locks to database failed, lockNames = {}.", names, e); + } + return false; + } + + @Override + public void batchUnLock(List names) { + try { + CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockNames(names)); + }catch (Exception e) { + log.error("[DatabaseLockService] unlocks {} failed.", names, e); + } + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java index 146d6a68..401003e7 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java @@ -1,5 +1,7 @@ package com.github.kfcfans.oms.server.service.lock; +import java.util.List; + /** * 锁服务,所有方法都不允许抛出任何异常! * @@ -21,4 +23,7 @@ public interface LockService { */ void unlock(String name); + boolean batchLock(List names); + void batchUnLock(List names); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/HashedWheelTimerHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/HashedWheelTimerHolder.java new file mode 100644 index 00000000..c13bd148 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/HashedWheelTimerHolder.java @@ -0,0 +1,17 @@ +package com.github.kfcfans.oms.server.service.schedule; + +import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer; + +/** + * 时间轮单例 + * + * @author tjq + * @since 2020/4/5 + */ +public class HashedWheelTimerHolder { + + public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, 4); + + private HashedWheelTimerHolder() { + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java new file mode 100644 index 00000000..a29247a7 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java @@ -0,0 +1,74 @@ +package com.github.kfcfans.oms.server.service.schedule; + +import com.github.kfcfans.common.utils.CommonUtils; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository; +import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; +import com.github.kfcfans.oms.server.service.lock.LockService; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 任务调度执行服务 + * + * @author tjq + * @since 2020/4/5 + */ +@Slf4j +@Service +public class JobScheduleService { + + private static final int MAX_BATCH_NUM = 10; + + @Resource + private LockService lockService; + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private JobLogRepository jobLogRepository; + + private static final String SCHEDULE_LOCK = "schedule_lock_%d"; + private static final long SCHEDULE_RATE = 10000; + + @Scheduled(fixedRate = SCHEDULE_RATE) + private void getJob() { + List allAppIds = WorkerManagerService.listAppIds(); + if (CollectionUtils.isEmpty(allAppIds)) { + log.info("[JobScheduleService] current server has no app's job to schedule."); + return; + } + + long timeThreshold = System.currentTimeMillis() + 2 * SCHEDULE_RATE; + Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> { + + List lockNames = partAppIds.stream().map(JobScheduleService::genLock).collect(Collectors.toList()); + // 1. 先批量获取锁,获取不到就改成单个循环模式 + boolean batchLock = lockService.batchLock(lockNames); + if (!batchLock) { + + }else { + try { + List jobInfos = jobInfoRepository.findByAppIdInAndNextTriggerTimeLessThanEqual(partAppIds, timeThreshold); + + // 顺序:先推入进时间轮 -> 写jobLog表 -> 更新nextTriggerTime(原则:宁可重复执行,也不能不调度) + + + }catch (Exception e) { + + } + } + }); + } + + private static String genLock(Long appId) { + return String.format(SCHEDULE_LOCK, appId); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java index bf333e4d..5add9d89 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java @@ -1,5 +1,7 @@ package com.github.kfcfans.oms.server.web.controller; +import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.service.ha.ServerSelectService; import com.github.kfcfans.oms.server.web.ResultDTO; import org.springframework.web.bind.annotation.GetMapping; @@ -20,10 +22,21 @@ public class ServerController { @Resource private ServerSelectService serverSelectService; + @Resource + private AppInfoRepository appInfoRepository; + + @GetMapping("assert") + public ResultDTO assertAppName(String appName) { + AppInfoDO appInfo = appInfoRepository.findByAppName(appName); + if (appInfo == null) { + return ResultDTO.failed(appName + " is not registered!"); + } + return ResultDTO.success(appInfo.getId()); + } @GetMapping("/acquire") - public ResultDTO acquireServer(String appName) { - String server = serverSelectService.getServer(appName); + public ResultDTO acquireServer(Long appId) { + String server = serverSelectService.getServer(appId); return ResultDTO.success(server); } diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java new file mode 100644 index 00000000..d86e7aae --- /dev/null +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java @@ -0,0 +1,43 @@ +package com.github.kfcfans.oms.server.test; + +import com.github.kfcfans.common.utils.NetUtils; +import com.github.kfcfans.oms.server.persistence.model.OmsLockDO; +import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository; +import org.assertj.core.util.Lists; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.util.List; + +/** + * 数据库层测试 + * + * @author tjq + * @since 2020/4/5 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class RepositoryTest { + + @Resource + private OmsLockRepository omsLockRepository; + + /** + * 需要证明批量写入失败后会回滚 + */ + @Test + public void testBatchLock() { + + List locks = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost()); + locks.add(lockDO); + } + omsLockRepository.saveAll(locks); + omsLockRepository.flush(); + } + +} diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java index 6e1201e0..b8ab7207 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java @@ -1,12 +1,14 @@ package com.github.kfcfans.oms.server.test; import com.github.kfcfans.oms.server.service.lock.LockService; +import org.assertj.core.util.Lists; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; +import java.util.List; /** * 服务测试 @@ -30,4 +32,17 @@ public class ServiceTest { lockService.unlock(lockName); } + @Test + public void testBatchLock() { + List lockNames = Lists.newArrayList("a", "b", "C", "d", "e"); + System.out.println(lockService.batchLock(lockNames)); + System.out.println(lockService.batchLock(lockNames)); + } + + @Test + public void testBatchUnLock() { + List lockNames = Lists.newArrayList("a", "b", "C", "d", "e"); + lockService.batchUnLock(lockNames); + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 99836763..4cf9a799 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -46,6 +46,8 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { private static String workerAddress; public static ActorSystem actorSystem; + @Getter + private static Long appId; private static ScheduledExecutorService timingPool; @Override diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java index 193b5f85..c549c947 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java @@ -26,9 +26,12 @@ public class WorkerHealthReportRunnable implements Runnable { SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics(); WorkerHeartbeat heartbeat = new WorkerHeartbeat(); + heartbeat.setSystemMetrics(systemMetrics); heartbeat.setWorkerAddress(OhMyWorker.getWorkerAddress()); heartbeat.setAppName(OhMyWorker.getConfig().getAppName()); + heartbeat.setAppId(OhMyWorker.getAppId()); + heartbeat.setHeartbeatTime(System.currentTimeMillis()); // 发送请求 String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java index 3365df22..998a33fb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java @@ -44,6 +44,8 @@ public class SystemInfoUtils { metrics.setDiskTotal(bytes2GB(total)); metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal()); + // 在Worker完成分数计算,减小Server压力 + metrics.calculateScore(); return metrics; }