mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: [storageExt] finished MySqlSeriesDfsService
This commit is contained in:
parent
37a62549db
commit
ad08406d0b
@ -7,6 +7,7 @@ import tech.powerjob.server.extension.dfs.DFsService;
|
|||||||
import tech.powerjob.server.persistence.storage.impl.AliOssService;
|
import tech.powerjob.server.persistence.storage.impl.AliOssService;
|
||||||
import tech.powerjob.server.persistence.storage.impl.EmptyDFsService;
|
import tech.powerjob.server.persistence.storage.impl.EmptyDFsService;
|
||||||
import tech.powerjob.server.persistence.storage.impl.GridFsService;
|
import tech.powerjob.server.persistence.storage.impl.GridFsService;
|
||||||
|
import tech.powerjob.server.persistence.storage.impl.MySqlSeriesDfsService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Description
|
* Description
|
||||||
@ -23,6 +24,12 @@ public class StorageConfiguration {
|
|||||||
return new GridFsService();
|
return new GridFsService();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class)
|
||||||
|
public DFsService initDbFs() {
|
||||||
|
return new MySqlSeriesDfsService();
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@Conditional(AliOssService.AliOssCondition.class)
|
@Conditional(AliOssService.AliOssCondition.class)
|
||||||
public DFsService initAliOssFs() {
|
public DFsService initAliOssFs() {
|
||||||
|
@ -11,10 +11,12 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.annotation.Conditional;
|
import org.springframework.context.annotation.Conditional;
|
||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
import tech.powerjob.common.serialize.JsonUtils;
|
import tech.powerjob.common.serialize.JsonUtils;
|
||||||
|
import tech.powerjob.common.utils.CommonUtils;
|
||||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||||
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
|
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
|
||||||
import tech.powerjob.server.extension.dfs.*;
|
import tech.powerjob.server.extension.dfs.*;
|
||||||
@ -103,25 +105,25 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
|
|||||||
|
|
||||||
private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);";
|
private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);";
|
||||||
|
|
||||||
private static final String DELETE_SQL = "DELETE FROM %s";
|
private static final String DELETE_SQL = "DELETE FROM %s ";
|
||||||
|
|
||||||
private static final String QUERY_FULL_SQL = "select * from %s";
|
private static final String QUERY_FULL_SQL = "select * from %s";
|
||||||
|
|
||||||
private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s";
|
private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s";
|
||||||
|
|
||||||
|
|
||||||
private void deleteByLocation(FileLocation fileLocation) {
|
private void deleteByLocation(FileLocation fileLocation) {
|
||||||
String dSQLPrefix = fullSQL(DELETE_SQL);
|
String dSQLPrefix = fullSQL(DELETE_SQL);
|
||||||
|
String dSQL = dSQLPrefix.concat(whereSQL(fileLocation));
|
||||||
|
executeDelete(dSQL);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeDelete(String sql) {
|
||||||
try (Connection con = dataSource.getConnection()) {
|
try (Connection con = dataSource.getConnection()) {
|
||||||
|
con.createStatement().executeUpdate(sql);
|
||||||
String dSQL = dSQLPrefix.concat(whereSQL(fileLocation));
|
|
||||||
|
|
||||||
con.createStatement().executeUpdate(dSQL);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MySqlSeriesDfsService] deleteByLocation [{}] failed!", fileLocation);
|
log.error("[MySqlSeriesDfsService] executeDelete failed, sql: {}", sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -226,6 +228,20 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
|
|||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanExpiredFiles(String bucket, int days) {
|
||||||
|
|
||||||
|
// 虽然官方提供了服务端删除的能力,依然强烈建议用户直接在数据库层面配置清理事件!!!
|
||||||
|
|
||||||
|
String dSQLPrefix = fullSQL(DELETE_SQL);
|
||||||
|
final long targetTs = DateUtils.addDays(new Date(System.currentTimeMillis()), -days).getTime();
|
||||||
|
final String targetDeleteTime = CommonUtils.formatTime(targetTs);
|
||||||
|
log.info("[MySqlSeriesDfsService] start to cleanExpiredFiles, targetDeleteTime: {}", targetDeleteTime);
|
||||||
|
String fSQL = dSQLPrefix.concat(String.format(" where gmt_modified < '%s'", targetDeleteTime));
|
||||||
|
log.info("[MySqlSeriesDfsService] cleanExpiredFiles SQL: {}", fSQL);
|
||||||
|
executeDelete(fSQL);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(ApplicationContext applicationContext) {
|
protected void init(ApplicationContext applicationContext) {
|
||||||
|
|
||||||
|
@ -21,6 +21,8 @@ import java.util.concurrent.ThreadLocalRandom;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractDfsServiceTest {
|
public abstract class AbstractDfsServiceTest {
|
||||||
|
|
||||||
|
private static final String BUCKET = "pj_test";
|
||||||
|
|
||||||
abstract protected Optional<DFsService> fetchService();
|
abstract protected Optional<DFsService> fetchService();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -43,7 +45,7 @@ public abstract class AbstractDfsServiceTest {
|
|||||||
FileUtils.forceMkdirParent(sourceFile);
|
FileUtils.forceMkdirParent(sourceFile);
|
||||||
OmsFileUtils.string2File(content, sourceFile);
|
OmsFileUtils.string2File(content, sourceFile);
|
||||||
|
|
||||||
FileLocation fileLocation = new FileLocation().setBucket("pj_test").setName(String.format("test_%d.txt", ThreadLocalRandom.current().nextLong()));
|
FileLocation fileLocation = new FileLocation().setBucket(BUCKET).setName(String.format("test_%d.txt", ThreadLocalRandom.current().nextLong()));
|
||||||
|
|
||||||
StoreRequest storeRequest = new StoreRequest()
|
StoreRequest storeRequest = new StoreRequest()
|
||||||
.setFileLocation(fileLocation)
|
.setFileLocation(fileLocation)
|
||||||
@ -69,6 +71,9 @@ public abstract class AbstractDfsServiceTest {
|
|||||||
String downloadFileContent = FileUtils.readFileToString(downloadFile, StandardCharsets.UTF_8);
|
String downloadFileContent = FileUtils.readFileToString(downloadFile, StandardCharsets.UTF_8);
|
||||||
log.info("[testBaseFileOperation] download content: {}", downloadFileContent);
|
log.info("[testBaseFileOperation] download content: {}", downloadFileContent);
|
||||||
assert downloadFileContent.equals(content);
|
assert downloadFileContent.equals(content);
|
||||||
|
|
||||||
|
// 定时清理,只是执行,不校验
|
||||||
|
aliOssService.cleanExpiredFiles(BUCKET, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
x
Reference in New Issue
Block a user