try to develop the scheduler, but I think maybe I need to think more...

This commit is contained in:
tjq 2020-04-05 18:44:08 +08:00
parent c064602648
commit 84203c5caa
23 changed files with 445 additions and 17 deletions

View File

@ -1,7 +1,10 @@
package com.github.kfcfans.common.model;
import com.sun.istack.internal.NotNull;
import lombok.Data;
import java.io.Serializable;
/**
* 系统指标
*
@ -9,7 +12,7 @@ import lombok.Data;
* @since 2020/3/25
*/
@Data
public class SystemMetrics {
public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
// CPU核心数量
private int cpuProcessors;
@ -26,4 +29,36 @@ public class SystemMetrics {
private double diskTotal;
private double diskUsage;
// 缓存分数
private int score;
@Override
public int compareTo(SystemMetrics that) {
return this.calculateScore() - that.calculateScore();
}
/**
* 计算得分情况内存 > CPU > 磁盘
* 磁盘必须有1G以上的剩余空间
*/
public int calculateScore() {
if (score > 0) {
return score;
}
double availableCPUCores = cpuProcessors * cpuLoad;
double availableMemory = jvmMaxMemory - jvmUsedMemory;
double availableDisk = diskTotal - diskUsage;
// 最低运行标准1G磁盘 & 0.5G内存 & 一个可用的CPU核心
if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 1) {
score = 1;
} else {
// 磁盘只需要满足最低标准即可
score = (int) (availableMemory * 2 + availableCPUCores);
}
return score;
}
}

View File

@ -18,6 +18,10 @@ public class WorkerHeartbeat implements Serializable {
private String workerAddress;
// 当前 appName
private String appName;
// 当前 appId
private Long appId;
// 当前时间
private long heartbeatTime;
private SystemMetrics systemMetrics;
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -13,6 +14,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
public class OhMyApplication {
public static void main(String[] args) {
// 先启动 ActorSystem
OhMyServer.init();
// 再启动SpringBoot
SpringApplication.run(OhMyApplication.class, args);
}

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.oms.server.core.akka;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
@ -27,7 +26,7 @@ public class OhMyServer {
@Getter
private static String actorSystemAddress;
public void init() {
public static void init() {
// 1. 启动 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.core.akka;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import lombok.extern.slf4j.Slf4j;
/**
@ -35,5 +36,6 @@ public class ServerActor extends AbstractActor {
}
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
WorkerManagerService.updateStatus(heartbeat);
}
}

View File

@ -54,6 +54,12 @@ public class JobInfoDO {
// 任务的每一个Task超时时间
private Long taskTimeLimit;
// 1 正常运行2 停止不再调度
private Integer status;
// 下一次调度时间
private Long nextTriggerTime;
private Date gmtCreate;
private Date gmtModified;

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
/**
* JobInfo 数据访问层
*
@ -10,4 +12,9 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/1
*/
public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
List<JobInfoDO> findByAppIdInAndNextTriggerTimeLessThanEqual(List<Long> appIds, Long time);
List<JobInfoDO> findByAppIdAndNextTriggerTimeLessThanEqual(Long appId, Long time);
}

View File

@ -10,4 +10,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/1
*/
public interface JobLogRepository extends JpaRepository<JobLogDO, Long> {
long countByJobIdAndStatus(Long jobId, Integer status);
}

View File

@ -6,6 +6,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import javax.transaction.Transactional;
import java.util.List;
/**
* 利用唯一性约束作为数据库锁
@ -20,5 +21,10 @@ public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
int deleteByLockName(String lockName);
@Modifying
@Transactional
@Query(value = "delete from oms_lock where lock_name in ?1", nativeQuery = true)
int deleteByLockNames(List<String> lockNames);
OmsLockDO findByLockName(String lockName);
}

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 派送服务
*
* @author tjq
* @since 2020/4/5
*/
@Service
public class DispatchService {
@Resource
private JobLogRepository jobLogRepository;
public void dispatch(JobInfoDO jobInfo) {
// 1. 查询当前运行的实例数
}
}

View File

@ -0,0 +1,74 @@
package com.github.kfcfans.oms.server.service.ha;
import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* 管理Worker集群状态
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
public class ClusterStatusHolder {
// 集群所属的应用名称
private String appName;
// 集群中所有机器的健康状态
private Map<String, SystemMetrics> address2Metrics;
// 集群中所有机器的最后心跳时间
private Map<String, Long> address2ActiveTime;
private static final long WORKER_TIMEOUT_MS = 30000;
public ClusterStatusHolder(String appName) {
this.appName = appName;
address2Metrics = Maps.newConcurrentMap();
address2ActiveTime = Maps.newConcurrentMap();
}
/**
* 更新 worker 机器的状态
*/
public void updateStatus(WorkerHeartbeat heartbeat) {
String workerAddress = heartbeat.getWorkerAddress();
long heartbeatTime = heartbeat.getHeartbeatTime();
address2Metrics.put(workerAddress, heartbeat.getSystemMetrics());
Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L);
if (heartbeatTime > oldTime) {
address2ActiveTime.put(workerAddress, heartbeatTime);
}
}
/**
* 选取状态最好的Worker进行任务派发
* @return Worker的地址null代表没有可用的Worker
*/
public String chooseBestWorker() {
// 直接对 HashMap 根据Value进行排序
List<Map.Entry<String, SystemMetrics>> entryList = Lists.newArrayList(address2Metrics.entrySet());
// 降序排序Comparator.comparingInt默认为升序弃用
entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore());
for (Map.Entry<String, SystemMetrics> entry : address2Metrics.entrySet()) {
long lastActiveTime = address2ActiveTime.getOrDefault(entry.getKey(), -1L);
long timeout = System.currentTimeMillis() - lastActiveTime;
if (timeout < WORKER_TIMEOUT_MS) {
return entry.getKey();
}
}
log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics);
return null;
}
}

View File

@ -15,6 +15,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -35,31 +36,31 @@ public class ServerSelectService {
private static final int RETRY_TIMES = 10;
private static final long PING_TIMEOUT_MS = 5000;
private static final long WAIT_LOCK_TIME = 1000;
private static final String SERVER_ELECT_LOCK = "server_elect_%s";
private static final String SERVER_ELECT_LOCK = "server_elect_%d";
/**
* 获取某个应用对应的Server
* 缺点如果server死而复生可能造成worker集群脑裂不过感觉影响不是很大 & 概率极低就不管了
* @param appName 应用名称
* @param appId 应用ID
* @return 当前可用的Server
*/
public String getServer(String appName) {
public String getServer(Long appId) {
for (int i = 0; i < RETRY_TIMES; i++) {
// 无锁获取当前数据库中的Server
AppInfoDO app = appInfoRepository.findByAppName(appName);
if (app == null) {
throw new RuntimeException(appName + " is not registered!");
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
throw new RuntimeException(appId + " is not registered!");
}
String originServer = app.getCurrentServer();
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
if (isActive(originServer)) {
return originServer;
}
// 获取失败重新进行Server选举需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appName);
// 无可用Server重新进行Server选举需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appId);
boolean lockStatus = lockService.lock(lockName);
if (!lockStatus) {
try {
@ -71,7 +72,10 @@ public class ServerSelectService {
try {
// 可能上一台机器已经完成了Server选举需要再次判断
AppInfoDO appInfo = appInfoRepository.findByAppName(appName);
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> {
log.error("[ServerSelectService] impossible, unless we just lost our database.");
return null;
});
if (isActive(appInfo.getCurrentServer())) {
return appInfo.getCurrentServer();
}
@ -88,7 +92,7 @@ public class ServerSelectService {
lockService.unlock(lockName);
}
}
throw new RuntimeException("server elect failed for app " + appName);
throw new RuntimeException("server elect failed for app " + appId);
}
private boolean isActive(String serverAddress) {

View File

@ -0,0 +1,55 @@
package com.github.kfcfans.oms.server.service.ha;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* Worker 管理服务
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
public class WorkerManagerService {
private static final Map<Long, ClusterStatusHolder> appName2ClusterStatus = Maps.newConcurrentMap();
/**
* 更新状态
* @param heartbeat Worker的心跳包
*/
public static void updateStatus(WorkerHeartbeat heartbeat) {
Long appId = heartbeat.getAppId();
String appName = heartbeat.getAppName();
ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
clusterStatusHolder.updateStatus(heartbeat);
}
/**
* 选择状态最好的Worker执行任务
* @param appId 应用ID
* @return Worker的地址null代表没有可用的Worker
*/
public static String chooseBestWorker(Long appId) {
ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
return null;
}
return clusterStatusHolder.chooseBestWorker();
}
/**
* 获取当前该 Server 管理的所有应用ID
* @return List<AppId>
*/
public static List<Long> listAppIds() {
return Lists.newArrayList(appName2ClusterStatus.keySet());
}
}

View File

@ -4,12 +4,14 @@ import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -70,4 +72,28 @@ public class DatabaseLockService implements LockService {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
@Override
public boolean batchLock(List<String> names) {
List<OmsLockDO> locks = Lists.newLinkedList();
names.forEach(name -> locks.add(new OmsLockDO(name, NetUtils.getLocalHost())));
try {
omsLockRepository.saveAll(locks);
omsLockRepository.flush();
return true;
}catch (DataIntegrityViolationException ignore) {
}catch (Exception e) {
log.warn("[DatabaseLockService] write locks to database failed, lockNames = {}.", names, e);
}
return false;
}
@Override
public void batchUnLock(List<String> names) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockNames(names));
}catch (Exception e) {
log.error("[DatabaseLockService] unlocks {} failed.", names, e);
}
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.oms.server.service.lock;
import java.util.List;
/**
* 锁服务所有方法都不允许抛出任何异常
*
@ -21,4 +23,7 @@ public interface LockService {
*/
void unlock(String name);
boolean batchLock(List<String> names);
void batchUnLock(List<String> names);
}

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.oms.server.service.schedule;
import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer;
/**
* 时间轮单例
*
* @author tjq
* @since 2020/4/5
*/
public class HashedWheelTimerHolder {
public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, 4);
private HashedWheelTimerHolder() {
}
}

View File

@ -0,0 +1,74 @@
package com.github.kfcfans.oms.server.service.schedule;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.lock.LockService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 任务调度执行服务
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
@Service
public class JobScheduleService {
private static final int MAX_BATCH_NUM = 10;
@Resource
private LockService lockService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private JobLogRepository jobLogRepository;
private static final String SCHEDULE_LOCK = "schedule_lock_%d";
private static final long SCHEDULE_RATE = 10000;
@Scheduled(fixedRate = SCHEDULE_RATE)
private void getJob() {
List<Long> allAppIds = WorkerManagerService.listAppIds();
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[JobScheduleService] current server has no app's job to schedule.");
return;
}
long timeThreshold = System.currentTimeMillis() + 2 * SCHEDULE_RATE;
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
List<String> lockNames = partAppIds.stream().map(JobScheduleService::genLock).collect(Collectors.toList());
// 1. 先批量获取锁获取不到就改成单个循环模式
boolean batchLock = lockService.batchLock(lockNames);
if (!batchLock) {
}else {
try {
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndNextTriggerTimeLessThanEqual(partAppIds, timeThreshold);
// 顺序先推入进时间轮 -> 写jobLog表 -> 更新nextTriggerTime原则宁可重复执行也不能不调度
}catch (Exception e) {
}
}
});
}
private static String genLock(Long appId) {
return String.format(SCHEDULE_LOCK, appId);
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.ha.ServerSelectService;
import com.github.kfcfans.oms.server.web.ResultDTO;
import org.springframework.web.bind.annotation.GetMapping;
@ -20,10 +22,21 @@ public class ServerController {
@Resource
private ServerSelectService serverSelectService;
@Resource
private AppInfoRepository appInfoRepository;
@GetMapping("assert")
public ResultDTO<Long> assertAppName(String appName) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName);
if (appInfo == null) {
return ResultDTO.failed(appName + " is not registered!");
}
return ResultDTO.success(appInfo.getId());
}
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(String appName) {
String server = serverSelectService.getServer(appName);
public ResultDTO<String> acquireServer(Long appId) {
String server = serverSelectService.getServer(appId);
return ResultDTO.success(server);
}

View File

@ -0,0 +1,43 @@
package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.List;
/**
* 数据库层测试
*
* @author tjq
* @since 2020/4/5
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class RepositoryTest {
@Resource
private OmsLockRepository omsLockRepository;
/**
* 需要证明批量写入失败后会回滚
*/
@Test
public void testBatchLock() {
List<OmsLockDO> locks = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost());
locks.add(lockDO);
}
omsLockRepository.saveAll(locks);
omsLockRepository.flush();
}
}

View File

@ -1,12 +1,14 @@
package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.oms.server.service.lock.LockService;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.List;
/**
* 服务测试
@ -30,4 +32,17 @@ public class ServiceTest {
lockService.unlock(lockName);
}
@Test
public void testBatchLock() {
List<String> lockNames = Lists.newArrayList("a", "b", "C", "d", "e");
System.out.println(lockService.batchLock(lockNames));
System.out.println(lockService.batchLock(lockNames));
}
@Test
public void testBatchUnLock() {
List<String> lockNames = Lists.newArrayList("a", "b", "C", "d", "e");
lockService.batchUnLock(lockNames);
}
}

View File

@ -46,6 +46,8 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
private static String workerAddress;
public static ActorSystem actorSystem;
@Getter
private static Long appId;
private static ScheduledExecutorService timingPool;
@Override

View File

@ -26,9 +26,12 @@ public class WorkerHealthReportRunnable implements Runnable {
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics);
heartbeat.setWorkerAddress(OhMyWorker.getWorkerAddress());
heartbeat.setAppName(OhMyWorker.getConfig().getAppName());
heartbeat.setAppId(OhMyWorker.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis());
// 发送请求
String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME);

View File

@ -44,6 +44,8 @@ public class SystemInfoUtils {
metrics.setDiskTotal(bytes2GB(total));
metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal());
// 在Worker完成分数计算减小Server压力
metrics.calculateScore();
return metrics;
}