feat: [storageExt] Unified File Storage Solution with DFsService

This commit is contained in:
tjq 2023-07-30 12:00:45 +08:00
parent fc57226d3a
commit 236d0a7f3b
24 changed files with 372 additions and 343 deletions

View File

@ -35,6 +35,7 @@
<db2-jdbc.version>11.5.0.0</db2-jdbc.version> <db2-jdbc.version>11.5.0.0</db2-jdbc.version>
<postgresql.version>42.2.14</postgresql.version> <postgresql.version>42.2.14</postgresql.version>
<h2.db.version>2.1.214</h2.db.version> <h2.db.version>2.1.214</h2.db.version>
<mongodb-driver-sync.version>4.10.2</mongodb-driver-sync.version>
<zip4j.version>2.11.2</zip4j.version> <zip4j.version>2.11.2</zip4j.version>
<jgit.version>5.7.0.202003110725-r</jgit.version> <jgit.version>5.7.0.202003110725-r</jgit.version>
@ -97,6 +98,14 @@
<artifactId>powerjob-server-starter</artifactId> <artifactId>powerjob-server-starter</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongodb-driver-sync.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@ -188,11 +197,6 @@
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>

View File

@ -41,9 +41,10 @@ import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService; import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager; import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.transporter.TransportService; import tech.powerjob.server.remote.transporter.TransportService;
@ -75,7 +76,7 @@ public class ContainerService {
@Resource @Resource
private ContainerInfoRepository containerInfoRepository; private ContainerInfoRepository containerInfoRepository;
@Resource @Resource
private GridFsManager gridFsManager; private DFsService dFsService;
@Resource @Resource
private TransportService transportService; private TransportService transportService;
@ -167,8 +168,10 @@ public class ContainerService {
String md5 = OmsFileUtils.md5(tmpFile); String md5 = OmsFileUtils.md5(tmpFile);
String fileName = genContainerJarName(md5); String fileName = genContainerJarName(md5);
// 上传到 mongoDB这兄弟耗时也有点小严重导致这个接口整体比较慢...不过也没必要开线程去处理 // 上传到 DFS这兄弟耗时也有点小严重导致这个接口整体比较慢...不过也没必要开线程去处理
gridFsManager.store(tmpFile, GridFsManager.CONTAINER_BUCKET, fileName); FileLocation fl = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName);
StoreRequest storeRequest = new StoreRequest().setLocalFile(tmpFile).setFileLocation(fl);
dFsService.store(storeRequest);
// 将文件拷贝到正确的路径 // 将文件拷贝到正确的路径
String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName; String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName;
@ -204,9 +207,17 @@ public class ContainerService {
if (localFile.exists()) { if (localFile.exists()) {
return localFile; return localFile;
} }
if (gridFsManager.available()) {
downloadJarFromGridFS(fileName, localFile); FileLocation fileLocation = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName);
try {
Optional<FileMeta> fileMetaOpt = dFsService.fetchFileMeta(fileLocation);
if (fileMetaOpt.isPresent()) {
dFsService.download(new DownloadRequest().setFileLocation(fileLocation).setTarget(localFile));
}
} catch (Exception e) {
log.warn("[ContainerService] fetchContainerJarFile from dsf failed, version: {}", version, e);
} }
return localFile; return localFile;
} }
@ -412,12 +423,14 @@ public class ContainerService {
String jarFileName = genContainerJarName(container.getVersion()); String jarFileName = genContainerJarName(container.getVersion());
if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) { FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(jarFileName);
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version."); Optional<FileMeta> dfsMetaOpt = dFsService.fetchFileMeta(dfsFL);
gridFsManager.store(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName); if (dfsMetaOpt.isPresent()) {
remote.sendText("SYSTEM: upload to GridFS successfully~");
}else {
remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore."); remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore.");
} else {
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
dFsService.store(new StoreRequest().setFileLocation(dfsFL).setLocalFile(jarWithDependency));
remote.sendText("SYSTEM: upload to GridFS successfully~");
} }
// 将文件从临时工作目录移动到正式目录 // 将文件从临时工作目录移动到正式目录
@ -463,13 +476,19 @@ public class ContainerService {
if (targetFile.exists()) { if (targetFile.exists()) {
return; return;
} }
if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, mongoFileName)) {
log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName);
return;
}
try { try {
FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(mongoFileName);
Optional<FileMeta> dfsMetaOpt = dFsService.fetchFileMeta(dfsFL);
if (!dfsMetaOpt.isPresent()) {
log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName);
return;
}
FileUtils.forceMkdirParent(targetFile); FileUtils.forceMkdirParent(targetFile);
gridFsManager.download(targetFile, GridFsManager.CONTAINER_BUCKET, mongoFileName);
dFsService.download(new DownloadRequest().setTarget(targetFile).setFileLocation(dfsFL));
}catch (Exception e) { }catch (Exception e) {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile)); CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile));
ExceptionUtils.rethrow(e); ExceptionUtils.rethrow(e);

View File

@ -10,17 +10,17 @@ import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.StringPage; import tech.powerjob.server.persistence.StringPage;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.local.LocalInstanceLogDO; import tech.powerjob.server.persistence.local.LocalInstanceLogDO;
import tech.powerjob.server.persistence.local.LocalInstanceLogRepository; import tech.powerjob.server.persistence.local.LocalInstanceLogRepository;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
@ -37,6 +37,7 @@ import javax.annotation.Resource;
import java.io.*; import java.io.*;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -58,7 +59,7 @@ public class InstanceLogService {
private InstanceMetadataService instanceMetadataService; private InstanceMetadataService instanceMetadataService;
@Resource @Resource
private GridFsManager gridFsManager; private DFsService dFsService;
/** /**
* 本地数据库操作bean * 本地数据库操作bean
*/ */
@ -214,14 +215,16 @@ public class InstanceLogService {
// 先持久化到本地文件 // 先持久化到本地文件
File stableLogFile = genStableLogFile(instanceId); File stableLogFile = genStableLogFile(instanceId);
// 将文件推送到 MongoDB // 将文件推送到 MongoDB
if (gridFsManager.available()) {
try { FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop()); try {
}catch (Exception e) { dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL));
log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e); log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
} }catch (Exception e) {
log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
} }
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e); log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
} }
@ -291,17 +294,14 @@ public class InstanceLogService {
} }
}else { }else {
if (!gridFsManager.available()) { FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f); Optional<FileMeta> dflMetaOpt = dFsService.fetchFileMeta(dfl);
return f; if (!dflMetaOpt.isPresent()) {
}
// 否则从 mongoDB 拉取数据对应后期查询的情况
if (!gridFsManager.exists(GridFsManager.LOG_BUCKET, genMongoFileName(instanceId))) {
OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f); OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
return f; return f;
} }
gridFsManager.download(f, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl));
} }
return f; return f;
}catch (Exception e) { }catch (Exception e) {

View File

@ -13,10 +13,11 @@ import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService; import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager; import tech.powerjob.server.extension.dfs.DFsService;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService; import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import java.io.File; import java.io.File;
@ -32,7 +33,7 @@ import java.util.Date;
@Service @Service
public class CleanService { public class CleanService {
private final GridFsManager gridFsManager; private final DFsService dFsService;
private final InstanceInfoRepository instanceInfoRepository; private final InstanceInfoRepository instanceInfoRepository;
@ -57,12 +58,12 @@ public class CleanService {
private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, public CleanService(DFsService dFsService, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService, WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService,
@Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay, @Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay,
@Value("${oms.container.retention.local}") int localContainerRetentionDay, @Value("${oms.container.retention.local}") int localContainerRetentionDay,
@Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) { @Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) {
this.gridFsManager = gridFsManager; this.dFsService = dFsService;
this.instanceInfoRepository = instanceInfoRepository; this.instanceInfoRepository = instanceInfoRepository;
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository; this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
this.workflowNodeInfoRepository = workflowNodeInfoRepository; this.workflowNodeInfoRepository = workflowNodeInfoRepository;
@ -106,8 +107,8 @@ public class CleanService {
// 删除无用节点 // 删除无用节点
cleanWorkflowNodeInfo(); cleanWorkflowNodeInfo();
// 删除 GridFS 过期文件 // 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); cleanRemote(Constants.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); cleanRemote(Constants.CONTAINER_BUCKET, remoteContainerRetentionDay);
} finally { } finally {
lockService.unlock(HISTORY_DELETE_LOCK); lockService.unlock(HISTORY_DELETE_LOCK);
} }
@ -152,15 +153,13 @@ public class CleanService {
log.info("[CleanService] won't clean up bucket({}) because of offset day <= 0.", bucketName); log.info("[CleanService] won't clean up bucket({}) because of offset day <= 0.", bucketName);
return; return;
} }
if (gridFsManager.available()) { Stopwatch stopwatch = Stopwatch.createStarted();
Stopwatch stopwatch = Stopwatch.createStarted(); try {
try { dFsService.cleanExpiredFiles(bucketName, day);
gridFsManager.deleteBefore(bucketName, day); }catch (Exception e) {
}catch (Exception e) { log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e);
log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e);
}
log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop());
} }
log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop());
} }
@VisibleForTesting @VisibleForTesting

View File

@ -19,10 +19,6 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-persistence</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -25,13 +25,6 @@ public interface DFsService {
*/ */
void download(DownloadRequest downloadRequest) throws IOException; void download(DownloadRequest downloadRequest) throws IOException;
/**
* 删除文件
* @param fileLocation 文件位置
* @throws IOException 异常
*/
void delete(FileLocation fileLocation) throws IOException;
/** /**
* 获取文件元信息 * 获取文件元信息
* @param fileLocation 文件位置 * @param fileLocation 文件位置

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.extension.dfs; package tech.powerjob.server.extension.dfs;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors;
import java.io.File; import java.io.File;
import java.io.Serializable; import java.io.Serializable;
@ -12,6 +13,7 @@ import java.io.Serializable;
* @since 2023/7/16 * @since 2023/7/16
*/ */
@Data @Data
@Accessors(chain = true)
public class DownloadRequest implements Serializable { public class DownloadRequest implements Serializable {
private transient File target; private transient File target;

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.extension.dfs; package tech.powerjob.server.extension.dfs;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors;
/** /**
* 文件路径 * 文件路径
@ -9,6 +10,7 @@ import lombok.Data;
* @since 2023/7/16 * @since 2023/7/16
*/ */
@Data @Data
@Accessors(chain = true)
public class FileLocation { public class FileLocation {
/** /**

View File

@ -1,9 +1,9 @@
package tech.powerjob.server.extension.dfs; package tech.powerjob.server.extension.dfs;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* FileMeta * FileMeta
@ -12,15 +12,16 @@ import java.util.Objects;
* @since 2023/7/16 * @since 2023/7/16
*/ */
@Data @Data
@Accessors(chain = true)
public class FileMeta { public class FileMeta {
/** /**
* 文件大小 * 文件大小
*/ */
private final long length; private long length;
/** /**
* 元数据 * 元数据
*/ */
private Map<String, Objects> metaInfo; private Map<String, Object> metaInfo;
} }

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.extension.dfs; package tech.powerjob.server.extension.dfs;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors;
import java.io.File; import java.io.File;
import java.io.Serializable; import java.io.Serializable;
@ -12,6 +13,7 @@ import java.io.Serializable;
* @since 2023/7/16 * @since 2023/7/16
*/ */
@Data @Data
@Accessors(chain = true)
public class StoreRequest implements Serializable { public class StoreRequest implements Serializable {
private transient File localFile; private transient File localFile;

View File

@ -23,10 +23,19 @@
<groupId>tech.powerjob</groupId> <groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-common</artifactId> <artifactId>powerjob-server-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-extension</artifactId>
</dependency>
<dependency> <dependency>
<groupId>tech.powerjob</groupId> <groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-monitor</artifactId> <artifactId>powerjob-server-monitor</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,153 +0,0 @@
package tech.powerjob.server.persistence.mongodb;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Filters;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import java.io.*;
import java.util.Date;
import java.util.Map;
import java.util.function.Consumer;
/**
* GridFS 操作助手
*
* @author tjq
* @since 2020/5/18
*/
@Slf4j
@Service
public class GridFsManager implements InitializingBean {
private final Environment environment;
private final MongoDatabase db;
private boolean available;
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) {
this.environment = environment;
if (mongoTemplate != null) {
this.db = mongoTemplate.getDb();
} else {
this.db = null;
}
}
/**
* 是否可用
* @return true可用false不可用
*/
public boolean available() {
return available;
}
/**
* 存储文件到 GridFS
* @param localFile 本地文件
* @param bucketName 桶名称
* @param fileName GirdFS中的文件名称
* @throws IOException 异常
*/
public void store(File localFile, String bucketName, String fileName) throws IOException {
if (available()) {
GridFSBucket bucket = getBucket(bucketName);
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) {
bucket.uploadFromStream(fileName, bis);
}
}
}
/**
* GridFS 下载文件
* @param targetFile 下载的目标文件本地文件
* @param bucketName 桶名称
* @param fileName GirdFS中的文件名称
* @throws IOException 异常
*/
public void download(File targetFile, String bucketName, String fileName) throws IOException {
if (available()) {
GridFSBucket bucket = getBucket(bucketName);
try (GridFSDownloadStream gis = bucket.openDownloadStream(fileName);
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))
) {
byte[] buffer = new byte[1024];
int bytes = 0;
while ((bytes = gis.read(buffer)) != -1) {
bos.write(buffer, 0, bytes);
}
bos.flush();
}
}
}
/**
* 删除几天前的文件
* @param bucketName 桶名称
* @param day 日期偏移量单位
*/
public void deleteBefore(String bucketName, int day) {
Stopwatch sw = Stopwatch.createStarted();
Date date = DateUtils.addDays(new Date(), -day);
GridFSBucket bucket = getBucket(bucketName);
Bson filter = Filters.lt("uploadDate", date);
// 循环删除性能很差我猜你肯定没看过官方实现[狗头]org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
bucket.find(filter).forEach((Consumer<GridFSFile>) gridFSFile -> {
ObjectId objectId = gridFSFile.getObjectId();
try {
bucket.delete(objectId);
log.info("[GridFsManager] deleted {}#{}", bucketName, objectId);
}catch (Exception e) {
log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e);
}
});
log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
}
public boolean exists(String bucketName, String fileName) {
GridFSBucket bucket = getBucket(bucketName);
GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName));
try {
GridFSFile first = files.first();
return first != null;
}catch (Exception ignore) {
}
return false;
}
private GridFSBucket getBucket(String bucketName) {
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
}
@Override
public void afterPropertiesSet() throws Exception {
String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString());
available = Boolean.TRUE.toString().equals(enable) && db != null;
log.info("[GridFsManager] available: {}, db: {}", available, db);
}
}

View File

@ -0,0 +1,37 @@
package tech.powerjob.server.persistence.storage;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.env.Environment;
import tech.powerjob.server.extension.dfs.DFsService;
import javax.annotation.Resource;
/**
* AbstractDFsService
*
* @author tjq
* @since 2023/7/28
*/
public abstract class AbstractDFsService implements DFsService, InitializingBean {
@Resource
protected Environment environment;
protected boolean active = false;
protected static final String PROPERTY_KEY = "oms.storage.dfs";
protected boolean active() {
return active;
}
protected void turnOn() {
active = true;
}
protected static String fetchProperty(Environment environment, String dfsType, String key) {
String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
return environment.getProperty(pKey);
}
}

View File

@ -0,0 +1,15 @@
package tech.powerjob.server.persistence.storage;
/**
* Constants
*
* @author tjq
* @since 2023/7/30
*/
public class Constants {
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
}

View File

@ -0,0 +1,45 @@
package tech.powerjob.server.persistence.storage.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import java.io.IOException;
import java.util.Optional;
/**
* Alibaba OSS support
* <a href="https://www.aliyun.com/product/oss">海量安全低成本高可靠的云存储服务</a>
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
@Service
@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.uri"}, matchIfMissing = false)
@ConditionalOnMissingBean(DFsService.class)
public class AliOssService extends AbstractDFsService {
@Override
public void afterPropertiesSet() throws Exception {
}
@Override
public void store(StoreRequest storeRequest) throws IOException {
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
return Optional.empty();
}
}

View File

@ -0,0 +1,38 @@
package tech.powerjob.server.persistence.storage.impl;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import tech.powerjob.server.extension.dfs.*;
import java.io.IOException;
import java.util.Optional;
/**
* EmptyDFsService
*
* @author tjq
* @since 2023/7/30
*/
@Service
@Order(value = Ordered.LOWEST_PRECEDENCE)
@ConditionalOnMissingBean(DFsService.class)
public class EmptyDFsService implements DFsService, {
public EmptyDFsService() {
}
@Override
public void store(StoreRequest storeRequest) throws IOException {
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
return Optional.empty();
}
}

View File

@ -0,0 +1,141 @@
package tech.powerjob.server.persistence.storage.impl;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Filters;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import java.io.*;
import java.nio.file.Files;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
/**
* 使用 MongoDB GridFS 作为底层存储
* 配置用法oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
*
* @author tjq
* @since 2023/7/28
*/
@Slf4j
@Service
@ConditionalOnProperty(name = {"oms.storage.dfs.mongodb.uri", "spring.data.mongodb.uri"}, matchIfMissing = false)
@ConditionalOnMissingBean(DFsService.class)
public class GridFsService extends AbstractDFsService implements InitializingBean {
private MongoDatabase db;
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
private static final String TYPE_MONGO = "mongodb";
private static final String SPRING_MONGO_DB_CONFIG_KEY = "spring.data.mongodb.uri";
@Override
public void store(StoreRequest storeRequest) throws IOException {
GridFSBucket bucket = getBucket(storeRequest.getFileLocation().getBucket());
try (BufferedInputStream bis = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))) {
bucket.uploadFromStream(storeRequest.getFileLocation().getName(), bis);
}
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
GridFSBucket bucket = getBucket(downloadRequest.getFileLocation().getBucket());
try (GridFSDownloadStream gis = bucket.openDownloadStream(downloadRequest.getFileLocation().getName());
BufferedOutputStream bos = new BufferedOutputStream(Files.newOutputStream(downloadRequest.getTarget().toPath()))
) {
byte[] buffer = new byte[1024];
int bytes = 0;
while ((bytes = gis.read(buffer)) != -1) {
bos.write(buffer, 0, bytes);
}
bos.flush();
}
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
GridFSBucket bucket = getBucket(fileLocation.getBucket());
GridFSFindIterable files = bucket.find(Filters.eq("filename", fileLocation.getName()));
GridFSFile first = files.first();
if (first == null) {
return Optional.empty();
}
return Optional.of(new FileMeta()
.setLength(first.getLength())
.setMetaInfo(first.getMetadata()));
}
@Override
public void cleanExpiredFiles(String bucketName, int days) {
Stopwatch sw = Stopwatch.createStarted();
Date date = DateUtils.addDays(new Date(), -days);
GridFSBucket bucket = getBucket(bucketName);
Bson filter = Filters.lt("uploadDate", date);
// 循环删除性能很差我猜你肯定没看过官方实现[狗头]org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
bucket.find(filter).forEach(gridFSFile -> {
ObjectId objectId = gridFSFile.getObjectId();
try {
bucket.delete(objectId);
log.info("[GridFsService] deleted {}#{}", bucketName, objectId);
}catch (Exception e) {
log.error("[GridFsService] deleted {}#{} failed.", bucketName, objectId, e);
}
});
log.info("[GridFsService] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
}
@Override
public void afterPropertiesSet() throws Exception {
String uri = parseMongoUri(environment);
log.info("[GridFsService] mongoDB uri: {}", uri);
if (StringUtils.isEmpty(uri)) {
log.warn("[GridFsService] uri is empty, GridFsService is off now!");
return;
}
ConnectionString connectionString = new ConnectionString(uri);
MongoClient mongoClient = MongoClients.create(connectionString);
db = mongoClient.getDatabase(Optional.ofNullable(connectionString.getDatabase()).orElse("pj"));
turnOn();
log.info("[GridFsService] turn on mongodb GridFs as storage layer.");
}
private GridFSBucket getBucket(String bucketName) {
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
}
static String parseMongoUri(Environment environment) {
// 优先从新的规则读取
String uri = fetchProperty(environment, TYPE_MONGO, "uri");
if (StringUtils.isNotEmpty(uri)) {
return uri;
}
// 兼容 4.3.3 前的逻辑读取 SpringMongoDB 配置
return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
}
}

View File

@ -2,12 +2,11 @@ package tech.powerjob.server.remote.worker;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.DispatchStrategy; import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter; import tech.powerjob.server.remote.worker.filter.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.server.redirector.DesignateServer;

View File

@ -6,7 +6,6 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.Set; import java.util.Set;

View File

@ -1,6 +1,5 @@
package tech.powerjob.server.remote.worker.filter; package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@ -1,7 +1,6 @@
package tech.powerjob.server.remote.worker.filter; package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.common.model.SystemMetrics; import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension; package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;

View File

@ -1,61 +0,0 @@
package tech.powerjob.server.test;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
/**
* GridFS 测试
*
* @author tjq
* @since 2020/5/18
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class GridFsTest {
@Resource
private GridFsManager gridFsManager;
@Test
@Disabled
public void testStore() throws IOException {
/**
File file = new File("/Users/tjq/Desktop/DistributeCompute/oms-template-origin.zip");
gridFsManager.store(file, "test", "test.zip");
**/
}
@Test
@Disabled
public void testDownload() throws IOException {
/**
File file = new File("/Users/tjq/Desktop/tmp/test-download.zip");
gridFsManager.download(file, "test", "test.zip");
**/
}
@Test
@Disabled
public void testDelete() {
/**
gridFsManager.deleteBefore("fs", 0);
**/
}
@Test
@Disabled
public void testExists() {
/**
System.out.println(gridFsManager.exists("test", "test.zip"));
System.out.println(gridFsManager.exists("test", "oms-sql.sql"));
**/
}
}

View File

@ -1,56 +0,0 @@
package tech.powerjob.server.test;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.core.scheduler.CleanService;
import com.mongodb.client.gridfs.model.GridFSFile;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Date;
import java.util.function.Consumer;
/**
* 在线日志测试
*
* @author tjq
* @since 2020/5/11
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Disabled
public class OmsLogTest {
@Resource
private CleanService cleanService;
@Resource
private GridFsTemplate gridFsTemplate;
@Test
public void testLocalLogCleaner() {
cleanService.cleanLocal(OmsFileUtils.genLogDirPath(), 0);
}
@Test
public void testRemoteLogCleaner() {
cleanService.cleanRemote(GridFsManager.LOG_BUCKET, 0);
}
@Test
public void testGridFsQuery() {
Query mongoQuery = Query.query(Criteria.where("uploadDate").gt(new Date()));
gridFsTemplate.find(mongoQuery).forEach(new Consumer<GridFSFile>() {
@Override
public void accept(GridFSFile gridFSFile) {
System.out.println(gridFSFile.getFilename());
}
});
}
}