finished dbLockService & Server elect

This commit is contained in:
tjq 2020-04-05 15:34:12 +08:00
parent 03f167cb44
commit c064602648
18 changed files with 322 additions and 65 deletions

View File

@ -17,4 +17,5 @@ import java.io.Serializable;
@AllArgsConstructor
public class AskResponse implements Serializable {
private boolean success;
private Object extra;
}

View File

@ -21,6 +21,7 @@
<hikaricp.version>3.4.2</hikaricp.version>
<mysql.version>8.0.19</mysql.version>
<commons.lang.version>3.10</commons.lang.version>
<curator.version>4.3.0</curator.version>
</properties>
<dependencies>
@ -53,6 +54,14 @@
<version>${akka.version}</version>
</dependency>
<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -0,0 +1,36 @@
package com.github.kfcfans.oms.server.common.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ZooKeeper 连接配置
*
* @author tjq
* @since 2020/4/4
*/
@Configuration
public class CuratorConfig {
@Value("${zookeeper.address}")
private String zkAddress;
@Bean("omsCurator")
public CuratorFramework initCurator() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.namespace("oms")
// zookeeper 地址多值用 , 分割即可
.connectString(zkAddress)
.sessionTimeoutMs(1000)
.connectionTimeoutMs(1000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
return client;
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.oms.server.core.actors;
package com.github.kfcfans.oms.server.core.akka;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.github.kfcfans.common.RemoteConstant;
@ -7,6 +9,7 @@ import com.github.kfcfans.common.utils.NetUtils;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@ -21,6 +24,7 @@ import java.util.Map;
public class OhMyServer {
public static ActorSystem actorSystem;
@Getter
private static String actorSystemAddress;
public void init() {
@ -41,4 +45,14 @@ public class OhMyServer {
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
}
/**
* 获取 ServerActor ActorSelection
* @param address IP:port
* @return ActorSelection
*/
public static ActorSelection getServerActor(String address) {
String path = String.format("akka://%s@%s/user/%s", RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
}

View File

@ -0,0 +1,16 @@
package com.github.kfcfans.oms.server.core.akka;
import lombok.Data;
import java.io.Serializable;
/**
* 检测目标机器是否存活
*
* @author tjq
* @since 2020/4/5
*/
@Data
public class Ping implements Serializable {
private long currentTime;
}

View File

@ -1,7 +1,8 @@
package com.github.kfcfans.oms.server.core.actors;
package com.github.kfcfans.oms.server.core.akka;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import lombok.extern.slf4j.Slf4j;
/**
@ -17,13 +18,22 @@ public class ServerActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(Ping.class, this::onReceivePing)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
/**
* 处理存活检测的请求
* @param ping 存活检测请求
*/
private void onReceivePing(Ping ping) {
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(true);
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
getSender().tell(askResponse, getSelf());
}
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
}
}

View File

@ -23,6 +23,8 @@ public class AppInfoDO {
private String appName;
private String description;
private String currentServer;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -23,14 +23,14 @@ public class OmsLockDO {
private Long id;
private String lockName;
private String owner;
private String ownerIP;
private Date gmtCreate;
private Date gmtModified;
public OmsLockDO(String lockName, String owner) {
public OmsLockDO(String lockName, String ownerIP) {
this.lockName = lockName;
this.owner = owner;
this.ownerIP = ownerIP;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}

View File

@ -10,4 +10,6 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/1
*/
public interface AppInfoRepository extends JpaRepository<AppInfoDO, Long> {
AppInfoDO findByAppName(String appName);
}

View File

@ -19,4 +19,6 @@ public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Transactional
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
int deleteByLockName(String lockName);
OmsLockDO findByLockName(String lockName);
}

View File

@ -0,0 +1,111 @@
package com.github.kfcfans.oms.server.service.ha;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.core.akka.Ping;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.lock.LockService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Worker请求分配Server服务
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
@Service
public class ServerSelectService {
@Resource
private LockService lockService;
@Resource
private AppInfoRepository appInfoRepository;
private static final int RETRY_TIMES = 10;
private static final long PING_TIMEOUT_MS = 5000;
private static final long WAIT_LOCK_TIME = 1000;
private static final String SERVER_ELECT_LOCK = "server_elect_%s";
/**
* 获取某个应用对应的Server
* 缺点如果server死而复生可能造成worker集群脑裂不过感觉影响不是很大 & 概率极低就不管了
* @param appName 应用名称
* @return 当前可用的Server
*/
public String getServer(String appName) {
for (int i = 0; i < RETRY_TIMES; i++) {
// 无锁获取当前数据库中的Server
AppInfoDO app = appInfoRepository.findByAppName(appName);
if (app == null) {
throw new RuntimeException(appName + " is not registered!");
}
String originServer = app.getCurrentServer();
if (isActive(originServer)) {
return originServer;
}
// 获取失败重新进行Server选举需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appName);
boolean lockStatus = lockService.lock(lockName);
if (!lockStatus) {
try {
Thread.sleep(1000);
}catch (Exception ignore) {
}
continue;
}
try {
// 可能上一台机器已经完成了Server选举需要再次判断
AppInfoDO appInfo = appInfoRepository.findByAppName(appName);
if (isActive(appInfo.getCurrentServer())) {
return appInfo.getCurrentServer();
}
// 篡位本机作为Server
appInfo.setCurrentServer(OhMyServer.getActorSystemAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
return appInfo.getCurrentServer();
}catch (Exception e) {
log.warn("[ServerSelectService] write new server to db failed for app {}.", appName);
}finally {
lockService.unlock(lockName);
}
}
throw new RuntimeException("server elect failed for app " + appName);
}
private boolean isActive(String serverAddress) {
if (StringUtils.isEmpty(serverAddress)) {
return false;
}
Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis());
ActorSelection serverActor = OhMyServer.getServerActor(serverAddress);
try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return response.isSuccess();
}catch (Exception e) {
log.warn("[ServerSelectService] server({}) was down, try to elect a new server.", serverAddress);
}
return false;
}
}

View File

@ -1,52 +0,0 @@
package com.github.kfcfans.oms.server.service.impl;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import com.github.kfcfans.oms.server.service.LockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 基于数据库实现分布式锁
*
* @author tjq
* @since 2020/4/2
*/
@Slf4j
@Service
public class DatabaseLockService implements LockService {
@Resource
private OmsLockRepository omsLockRepository;
@Override
public boolean lock(String name) {
try {
OmsLockDO lock = new OmsLockDO(name, NetUtils.getLocalHost());
omsLockRepository.saveAndFlush(lock);
return true;
}catch (DataIntegrityViolationException ignore) {
log.info("[DatabaseLockService] other thread get the lock {}.", name);
} catch (Exception e) {
log.error("[DatabaseLockService] lock {} failed.", name, e);
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}

View File

@ -0,0 +1,73 @@
package com.github.kfcfans.oms.server.service.lock;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 基于数据库实现的分布式锁
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
@Service
public class DatabaseLockService implements LockService {
@Resource
private OmsLockRepository omsLockRepository;
private Map<String, AtomicInteger> lockName2FailedTimes = Maps.newConcurrentMap();
private static final int MAX_FAILED_NUM = 5;
// 最长持有锁30秒
private static final long LOCK_TIMEOUT_MS = 30000;
@Override
public boolean lock(String name) {
AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0));
OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost());
try {
omsLockRepository.saveAndFlush(newLock);
failedCount.set(0);
return true;
}catch (DataIntegrityViolationException ignore) {
}catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
}
// 连续失败一段时间需要判断是否为锁释放失败的情况
if (failedCount.incrementAndGet() > MAX_FAILED_NUM) {
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
if (lockedMillions > LOCK_TIMEOUT_MS) {
log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO);
unlock(name);
} else {
failedCount.set(0);
}
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.server.service;
package com.github.kfcfans.oms.server.service.lock;
/**
* 锁服务
* 锁服务所有方法都不允许抛出任何异常
*
* @author tjq
* @since 2020/4/2

View File

@ -19,6 +19,6 @@ public class ControllerExceptionHandler {
@ExceptionHandler(Exception.class)
public ResultDTO<Void> exceptionHandler(Exception e) {
log.error("[ControllerException] http request failed.", e);
return ResultDTO.failed(e);
return ResultDTO.failed(e.getMessage());
}
}

View File

@ -0,0 +1,30 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.service.ha.ServerSelectService;
import com.github.kfcfans.oms.server.web.ResultDTO;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 处理内部请求的 Controller
*
* @author tjq
* @since 2020/4/4
*/
@RestController
@RequestMapping("/server")
public class ServerController {
@Resource
private ServerSelectService serverSelectService;
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(String appName) {
String server = serverSelectService.getServer(appName);
return ResultDTO.success(server);
}
}

View File

@ -12,3 +12,6 @@ spring.datasource.hikari.minimum-idle=5
# JPA 相关配置
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
# ZooKeeper(多值逗号分割)
zookeeper.address=115.159.215.229:2181

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.oms.server.service.LockService;
import com.github.kfcfans.oms.server.service.lock.LockService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;