fix: DatabaseLock can't unlock when timeout

This commit is contained in:
tjq 2021-01-02 13:14:25 +08:00
parent 3ecefd22cb
commit 9a661aa177

View File

@ -4,14 +4,11 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException; import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* 基于数据库实现的分布式锁 * 基于数据库实现的分布式锁
@ -26,35 +23,27 @@ public class DatabaseLockService implements LockService {
@Resource @Resource
private OmsLockRepository omsLockRepository; private OmsLockRepository omsLockRepository;
private Map<String, AtomicInteger> lockName2FailedTimes = Maps.newConcurrentMap();
private static final int MAX_FAILED_NUM = 5;
@Override @Override
public boolean lock(String name, long maxLockTime) { public boolean lock(String name, long maxLockTime) {
AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0));
OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime); OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime);
try { try {
omsLockRepository.saveAndFlush(newLock); omsLockRepository.saveAndFlush(newLock);
failedCount.set(0);
return true; return true;
}catch (DataIntegrityViolationException ignore) { } catch (DataIntegrityViolationException ignore) {
}catch (Exception e) { } catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e); log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
} }
// 连续失败一段时间需要判断是否为锁释放失败的情况 OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
if (failedCount.incrementAndGet() > MAX_FAILED_NUM) { long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); // 锁超时强制释放锁并重新尝试获取
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); if (lockedMillions > omsLockDO.getMaxLockTime()) {
if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO); log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name); unlock(name);
} else { return lock(name, maxLockTime);
failedCount.set(0);
}
} }
return false; return false;
} }