diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/ServerIdProvider.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/ServerIdProvider.java deleted file mode 100644 index 62751040..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/ServerIdProvider.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.github.kfcfans.powerjob.server.extension; - -/** - * provide unique server ip in the cluster for IdGenerateService - * @author user - */ -public interface ServerIdProvider { - - /** - * get number for IdGenerateService - * @return serverId, must in range [0, 16384) - */ - long getServerId(); -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java index 615933f2..0588d00f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java @@ -1,7 +1,14 @@ package com.github.kfcfans.powerjob.server.persistence.core.repository; import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import javax.transaction.Transactional; +import java.util.Date; /** * 服务器信息 数据操作层 @@ -10,5 +17,23 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/4/15 */ public interface ServerInfoRepository extends JpaRepository { + ServerInfoDO findByIp(String ip); + + @Transactional(rollbackOn = Exception.class) + @Modifying + @CanIgnoreReturnValue + @Query(value = "update ServerInfoDO set gmtModified = :gmtModified where ip = :ip") + int updateGmtModifiedByIp(@Param("ip") String ip, @Param("gmtModified") Date gmtModified); + + @Transactional(rollbackOn = Exception.class) + @Modifying + @CanIgnoreReturnValue + @Query(value = "update ServerInfoDO set id = :id where ip = :ip") + int updateIdByIp(@Param("id") long id, @Param("ip") String ip); + + @Transactional(rollbackOn = Exception.class) + @Modifying + @Query(value = "delete from ServerInfoDO where gmtModified < ?1") + int deleteByGmtModifiedBefore(Date threshold); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/ServerInfoService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/ServerInfoService.java new file mode 100644 index 00000000..f352b782 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/ServerInfoService.java @@ -0,0 +1,124 @@ +package com.github.kfcfans.powerjob.server.remote.server; + +import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.extension.LockService; +import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository; +import com.google.common.base.Stopwatch; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * management server info, like heartbeat, server id etc + * + * @author tjq + * @since 2021/2/21 + */ +@Slf4j +@Service +public class ServerInfoService { + + private final String ip; + private final long serverId; + + private final ServerInfoRepository serverInfoRepository; + + private static final long MAX_SERVER_CLUSTER_SIZE = 10000; + + private static final String SERVER_INIT_LOCK = "server_init_lock"; + private static final int SERVER_INIT_LOCK_MAX_TIME = 15000; + + public long getServerId() { + return serverId; + } + + @Autowired + public ServerInfoService(LockService lockService, ServerInfoRepository serverInfoRepository) { + + this.ip = NetUtils.getLocalHost(); + this.serverInfoRepository = serverInfoRepository; + + Stopwatch sw = Stopwatch.createStarted(); + + while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) { + log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK); + CommonUtils.easySleep(100); + } + + try { + + // register server then get server_id + ServerInfoDO server = serverInfoRepository.findByIp(ip); + if (server == null) { + ServerInfoDO newServerInfo = new ServerInfoDO(ip); + server = serverInfoRepository.saveAndFlush(newServerInfo); + } else { + serverInfoRepository.updateGmtModifiedByIp(ip, new Date()); + } + + if (server.getId() < MAX_SERVER_CLUSTER_SIZE) { + this.serverId = server.getId(); + } else { + this.serverId = retryServerId(); + serverInfoRepository.updateIdByIp(this.serverId, ip); + } + + } catch (Exception e) { + log.error("[ServerInfoService] init server failed", e); + throw e; + } finally { + lockService.unlock(SERVER_INIT_LOCK); + } + + log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverId, sw); + } + + @Scheduled(fixedRate = 15000, initialDelay = 15000) + public void heartbeat() { + serverInfoRepository.updateGmtModifiedByIp(ip, new Date()); + } + + + private long retryServerId() { + + List serverInfoList = serverInfoRepository.findAll(); + + log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size()); + + // clean inactive server record first + if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) { + + // use a large time interval to prevent valid records from being deleted when the local time is inaccurate + Date oneDayAgo = DateUtils.addDays(new Date(), -1); + int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo); + log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo); + + serverInfoList = serverInfoRepository.findAll(); + } + + if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) { + throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size())); + } + + Set uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet()); + for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) { + if (uedServerIds.contains(i)) { + continue; + } + + log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i); + return i; + } + throw new PowerJobException("impossible"); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/DefaultServerIdProvider.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/DefaultServerIdProvider.java deleted file mode 100644 index 41ba4ac9..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/DefaultServerIdProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.github.kfcfans.powerjob.server.service.id; - -import com.github.kfcfans.powerjob.common.utils.NetUtils; -import com.github.kfcfans.powerjob.server.extension.ServerIdProvider; -import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; -import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -/** - * 默认服务器 ID 生成策略,不适用于 Server 频繁重启且变化 IP 的场景 - * @author user - */ -@Slf4j -@Service -public class DefaultServerIdProvider implements ServerIdProvider { - - private final Long id; - - public DefaultServerIdProvider(ServerInfoRepository serverInfoRepository) { - String ip = NetUtils.getLocalHost(); - ServerInfoDO server = serverInfoRepository.findByIp(ip); - - if (server == null) { - ServerInfoDO newServerInfo = new ServerInfoDO(ip); - server = serverInfoRepository.saveAndFlush(newServerInfo); - } - this.id = server.getId(); - - log.info("[DefaultServerIdProvider] address:{},id:{}", ip, id); - } - - @Override - public long getServerId() { - return id; - } -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java index 73565360..ad6e2f08 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java @@ -1,14 +1,9 @@ package com.github.kfcfans.powerjob.server.service.id; -import com.github.kfcfans.powerjob.common.PowerJobException; -import com.github.kfcfans.powerjob.server.extension.ServerIdProvider; -import com.google.common.collect.Lists; +import com.github.kfcfans.powerjob.server.remote.server.ServerInfoService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; - -import java.util.List; /** * 唯一ID生成服务,使用 Twitter snowflake 算法 @@ -26,34 +21,11 @@ public class IdGenerateService { private static final int DATA_CENTER_ID = 0; @Autowired - public IdGenerateService(List serverIdProviders) { + public IdGenerateService(ServerInfoService serverInfoService) { - if (CollectionUtils.isEmpty(serverIdProviders)) { - throw new PowerJobException("can't find any ServerIdProvider!"); - } - - ServerIdProvider serverIdProvider; - int severIpProviderNums = serverIdProviders.size(); - if (severIpProviderNums == 1) { - serverIdProvider = serverIdProviders.get(0); - } else { - List extendServerIpProviders = Lists.newArrayList(); - for (ServerIdProvider sp : serverIdProviders) { - if (sp instanceof DefaultServerIdProvider) { - continue; - } - extendServerIpProviders.add(sp); - } - int extNum = extendServerIpProviders.size(); - if (extNum != 1) { - throw new PowerJobException(String.format("find %d ServerIdProvider but just need one, please delete the useless ServerIdProvider!", extNum)); - } - serverIdProvider = extendServerIpProviders.get(0); - } - - long id = serverIdProvider.getServerId(); + long id = serverInfoService.getServerId(); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); - log.info("[IdGenerateService] initialize IdGenerateService successfully, ServerIdProvider:{},ID:{}", serverIdProvider.getClass().getSimpleName(), id); + log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id); } /** diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/remote/server/ServerInfoServiceTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/remote/server/ServerInfoServiceTest.java new file mode 100644 index 00000000..88a1f7d1 --- /dev/null +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/remote/server/ServerInfoServiceTest.java @@ -0,0 +1,54 @@ +package com.github.kfcfans.powerjob.server.remote.server; + +import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.time.DateUtils; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * test server info + * + * @author tjq + * @since 2021/2/21 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class ServerInfoServiceTest { + + @Resource + private ServerInfoRepository serverInfoRepository; + + @Test + void generateInvalidRecord2Test() { + + List records = Lists.newLinkedList(); + for (int i = 0; i < 11111; i++) { + + // invalid ip to test + String ip = "T-192.168.1." + i; + + Date gmtModified = DateUtils.addHours(new Date(), -ThreadLocalRandom.current().nextInt(1, 48)); + + ServerInfoDO serverInfoDO = new ServerInfoDO(ip); + serverInfoDO.setGmtModified(gmtModified); + + records.add(serverInfoDO); + } + + serverInfoRepository.saveAll(records); + serverInfoRepository.flush(); + } + +} \ No newline at end of file