回滚GridFsManager的代码把锁加在CleanService上

This commit is contained in:
ocean23 2021-01-02 12:38:19 +08:00
parent 0e77a23e76
commit ead9f08e52
2 changed files with 13 additions and 17 deletions

View File

@ -1,7 +1,6 @@
package com.github.kfcfans.powerjob.server.persistence.mongodb; package com.github.kfcfans.powerjob.server.persistence.mongodb;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.service.lock.LockService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
@ -39,8 +38,6 @@ public class GridFsManager implements InitializingBean {
@Resource @Resource
private Environment environment; private Environment environment;
@Resource
private LockService lockService;
private MongoDatabase db; private MongoDatabase db;
private boolean available; private boolean available;
@ -110,22 +107,9 @@ public class GridFsManager implements InitializingBean {
* @param day 日期偏移量单位 * @param day 日期偏移量单位
*/ */
public void deleteBefore(String bucketName, int day) { public void deleteBefore(String bucketName, int day) {
String deleteFsLock = "deleteFsLock";
// 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了
boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000);
if (!lock) {
log.info("[GridFsManager] deleted task is running, it's ok to return.");
return;
}
try{
deleteHistoryFile(bucketName, day);
}finally {
lockService.unlock(deleteFsLock);
}
}
private void deleteHistoryFile(String bucketName, int day) {
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
Date date = DateUtils.addDays(new Date(), -day); Date date = DateUtils.addDays(new Date(), -day);
GridFSBucket bucket = getBucket(bucketName); GridFSBucket bucket = getBucket(bucketName);
Bson filter = Filters.lt("uploadDate", date); Bson filter = Filters.lt("uploadDate", date);

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.lock.LockService;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -36,6 +37,8 @@ public class CleanService {
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
@Resource @Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private LockService lockService;
@Value("${oms.instanceinfo.retention}") @Value("${oms.instanceinfo.retention}")
private int instanceInfoRetentionDay; private int instanceInfoRetentionDay;
@ -113,11 +116,20 @@ public class CleanService {
return; return;
} }
if (gridFsManager.available()) { if (gridFsManager.available()) {
String deleteFsLock = "deleteFsLock";
// 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了
boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000);
if (!lock) {
log.info("[GridFsManager] deleted task is running, it's ok to return.");
return;
}
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
try { try {
gridFsManager.deleteBefore(bucketName, day); gridFsManager.deleteBefore(bucketName, day);
}catch (Exception e) { }catch (Exception e) {
log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e); log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e);
}finally {
lockService.unlock(deleteFsLock);
} }
log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop()); log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop());
} }