feat: adopt a new server-id generation policy to solve the problem of long IDs #220

This commit is contained in:
tjq 2021-02-21 12:26:36 +08:00
parent 449608293c
commit 91bb28abbb
6 changed files with 207 additions and 83 deletions

View File

@ -1,14 +0,0 @@
package com.github.kfcfans.powerjob.server.extension;
/**
* provide unique server ip in the cluster for IdGenerateService
* @author user
*/
public interface ServerIdProvider {
/**
* get number for IdGenerateService
* @return serverId, must in range [0, 16384)
*/
long getServerId();
}

View File

@ -1,7 +1,14 @@
package com.github.kfcfans.powerjob.server.persistence.core.repository; package com.github.kfcfans.powerjob.server.persistence.core.repository;
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import javax.transaction.Transactional;
import java.util.Date;
/** /**
* 服务器信息 数据操作层 * 服务器信息 数据操作层
@ -10,5 +17,23 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/15 * @since 2020/4/15
*/ */
public interface ServerInfoRepository extends JpaRepository<ServerInfoDO, Long> { public interface ServerInfoRepository extends JpaRepository<ServerInfoDO, Long> {
ServerInfoDO findByIp(String ip); ServerInfoDO findByIp(String ip);
@Transactional(rollbackOn = Exception.class)
@Modifying
@CanIgnoreReturnValue
@Query(value = "update ServerInfoDO set gmtModified = :gmtModified where ip = :ip")
int updateGmtModifiedByIp(@Param("ip") String ip, @Param("gmtModified") Date gmtModified);
@Transactional(rollbackOn = Exception.class)
@Modifying
@CanIgnoreReturnValue
@Query(value = "update ServerInfoDO set id = :id where ip = :ip")
int updateIdByIp(@Param("id") long id, @Param("ip") String ip);
@Transactional(rollbackOn = Exception.class)
@Modifying
@Query(value = "delete from ServerInfoDO where gmtModified < ?1")
int deleteByGmtModifiedBefore(Date threshold);
} }

View File

@ -0,0 +1,124 @@
package com.github.kfcfans.powerjob.server.remote.server;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* management server info, like heartbeat, server id etc
*
* @author tjq
* @since 2021/2/21
*/
@Slf4j
@Service
public class ServerInfoService {
private final String ip;
private final long serverId;
private final ServerInfoRepository serverInfoRepository;
private static final long MAX_SERVER_CLUSTER_SIZE = 10000;
private static final String SERVER_INIT_LOCK = "server_init_lock";
private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;
public long getServerId() {
return serverId;
}
@Autowired
public ServerInfoService(LockService lockService, ServerInfoRepository serverInfoRepository) {
this.ip = NetUtils.getLocalHost();
this.serverInfoRepository = serverInfoRepository;
Stopwatch sw = Stopwatch.createStarted();
while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) {
log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK);
CommonUtils.easySleep(100);
}
try {
// register server then get server_id
ServerInfoDO server = serverInfoRepository.findByIp(ip);
if (server == null) {
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
server = serverInfoRepository.saveAndFlush(newServerInfo);
} else {
serverInfoRepository.updateGmtModifiedByIp(ip, new Date());
}
if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {
this.serverId = server.getId();
} else {
this.serverId = retryServerId();
serverInfoRepository.updateIdByIp(this.serverId, ip);
}
} catch (Exception e) {
log.error("[ServerInfoService] init server failed", e);
throw e;
} finally {
lockService.unlock(SERVER_INIT_LOCK);
}
log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverId, sw);
}
@Scheduled(fixedRate = 15000, initialDelay = 15000)
public void heartbeat() {
serverInfoRepository.updateGmtModifiedByIp(ip, new Date());
}
private long retryServerId() {
List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll();
log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size());
// clean inactive server record first
if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {
// use a large time interval to prevent valid records from being deleted when the local time is inaccurate
Date oneDayAgo = DateUtils.addDays(new Date(), -1);
int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo);
log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo);
serverInfoList = serverInfoRepository.findAll();
}
if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {
throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size()));
}
Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet());
for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) {
if (uedServerIds.contains(i)) {
continue;
}
log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i);
return i;
}
throw new PowerJobException("impossible");
}
}

View File

@ -1,37 +0,0 @@
package com.github.kfcfans.powerjob.server.service.id;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.extension.ServerIdProvider;
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 默认服务器 ID 生成策略不适用于 Server 频繁重启且变化 IP 的场景
* @author user
*/
@Slf4j
@Service
public class DefaultServerIdProvider implements ServerIdProvider {
private final Long id;
public DefaultServerIdProvider(ServerInfoRepository serverInfoRepository) {
String ip = NetUtils.getLocalHost();
ServerInfoDO server = serverInfoRepository.findByIp(ip);
if (server == null) {
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
server = serverInfoRepository.saveAndFlush(newServerInfo);
}
this.id = server.getId();
log.info("[DefaultServerIdProvider] address:{},id:{}", ip, id);
}
@Override
public long getServerId() {
return id;
}
}

View File

@ -1,14 +1,9 @@
package com.github.kfcfans.powerjob.server.service.id; package com.github.kfcfans.powerjob.server.service.id;
import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.server.remote.server.ServerInfoService;
import com.github.kfcfans.powerjob.server.extension.ServerIdProvider;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
/** /**
* 唯一ID生成服务使用 Twitter snowflake 算法 * 唯一ID生成服务使用 Twitter snowflake 算法
@ -26,34 +21,11 @@ public class IdGenerateService {
private static final int DATA_CENTER_ID = 0; private static final int DATA_CENTER_ID = 0;
@Autowired @Autowired
public IdGenerateService(List<ServerIdProvider> serverIdProviders) { public IdGenerateService(ServerInfoService serverInfoService) {
if (CollectionUtils.isEmpty(serverIdProviders)) { long id = serverInfoService.getServerId();
throw new PowerJobException("can't find any ServerIdProvider!");
}
ServerIdProvider serverIdProvider;
int severIpProviderNums = serverIdProviders.size();
if (severIpProviderNums == 1) {
serverIdProvider = serverIdProviders.get(0);
} else {
List<ServerIdProvider> extendServerIpProviders = Lists.newArrayList();
for (ServerIdProvider sp : serverIdProviders) {
if (sp instanceof DefaultServerIdProvider) {
continue;
}
extendServerIpProviders.add(sp);
}
int extNum = extendServerIpProviders.size();
if (extNum != 1) {
throw new PowerJobException(String.format("find %d ServerIdProvider but just need one, please delete the useless ServerIdProvider!", extNum));
}
serverIdProvider = extendServerIpProviders.get(0);
}
long id = serverIdProvider.getServerId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
log.info("[IdGenerateService] initialize IdGenerateService successfully, ServerIdProvider:{},ID:{}", serverIdProvider.getClass().getSimpleName(), id); log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
} }
/** /**

View File

@ -0,0 +1,54 @@
package com.github.kfcfans.powerjob.server.remote.server;
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.time.DateUtils;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.jupiter.api.Assertions.*;
/**
* test server info
*
* @author tjq
* @since 2021/2/21
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ServerInfoServiceTest {
@Resource
private ServerInfoRepository serverInfoRepository;
@Test
void generateInvalidRecord2Test() {
List<ServerInfoDO> records = Lists.newLinkedList();
for (int i = 0; i < 11111; i++) {
// invalid ip to test
String ip = "T-192.168.1." + i;
Date gmtModified = DateUtils.addHours(new Date(), -ThreadLocalRandom.current().nextInt(1, 48));
ServerInfoDO serverInfoDO = new ServerInfoDO(ip);
serverInfoDO.setGmtModified(gmtModified);
records.add(serverInfoDO);
}
serverInfoRepository.saveAll(records);
serverInfoRepository.flush();
}
}