diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index df3bd5fe..0aac969f 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -35,6 +35,7 @@ 11.5.0.0 42.2.14 2.1.214 + 4.10.2 2.11.2 5.7.0.202003110725-r @@ -97,6 +98,14 @@ powerjob-server-starter ${project.version} + + + + org.mongodb + mongodb-driver-sync + ${mongodb-driver-sync.version} + + @@ -188,11 +197,6 @@ spring-boot-starter-data-jpa ${springboot.version} - - org.springframework.boot - spring-boot-starter-data-mongodb - ${springboot.version} - org.springframework.boot spring-boot-starter-actuator diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java index 59008698..acb2fd0b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java @@ -41,9 +41,10 @@ import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.extension.LockService; -import tech.powerjob.server.persistence.mongodb.GridFsManager; +import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.persistence.storage.Constants; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.transporter.TransportService; @@ -75,7 +76,7 @@ public class ContainerService { @Resource private ContainerInfoRepository containerInfoRepository; @Resource - private GridFsManager gridFsManager; + private DFsService dFsService; @Resource private TransportService transportService; @@ -167,8 +168,10 @@ public class ContainerService { String md5 = OmsFileUtils.md5(tmpFile); String fileName = genContainerJarName(md5); - // 上传到 mongoDB,这兄弟耗时也有点小严重,导致这个接口整体比较慢...不过也没必要开线程去处理 - gridFsManager.store(tmpFile, GridFsManager.CONTAINER_BUCKET, fileName); + // 上传到 DFS,这兄弟耗时也有点小严重,导致这个接口整体比较慢...不过也没必要开线程去处理 + FileLocation fl = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName); + StoreRequest storeRequest = new StoreRequest().setLocalFile(tmpFile).setFileLocation(fl); + dFsService.store(storeRequest); // 将文件拷贝到正确的路径 String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName; @@ -204,9 +207,17 @@ public class ContainerService { if (localFile.exists()) { return localFile; } - if (gridFsManager.available()) { - downloadJarFromGridFS(fileName, localFile); + + FileLocation fileLocation = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName); + try { + Optional fileMetaOpt = dFsService.fetchFileMeta(fileLocation); + if (fileMetaOpt.isPresent()) { + dFsService.download(new DownloadRequest().setFileLocation(fileLocation).setTarget(localFile)); + } + } catch (Exception e) { + log.warn("[ContainerService] fetchContainerJarFile from dsf failed, version: {}", version, e); } + return localFile; } @@ -412,12 +423,14 @@ public class ContainerService { String jarFileName = genContainerJarName(container.getVersion()); - if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) { - remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version."); - gridFsManager.store(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName); - remote.sendText("SYSTEM: upload to GridFS successfully~"); - }else { + FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(jarFileName); + Optional dfsMetaOpt = dFsService.fetchFileMeta(dfsFL); + if (dfsMetaOpt.isPresent()) { remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore."); + } else { + remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version."); + dFsService.store(new StoreRequest().setFileLocation(dfsFL).setLocalFile(jarWithDependency)); + remote.sendText("SYSTEM: upload to GridFS successfully~"); } // 将文件从临时工作目录移动到正式目录 @@ -463,13 +476,19 @@ public class ContainerService { if (targetFile.exists()) { return; } - if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, mongoFileName)) { - log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName); - return; - } + try { + + FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(mongoFileName); + Optional dfsMetaOpt = dFsService.fetchFileMeta(dfsFL); + if (!dfsMetaOpt.isPresent()) { + log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName); + return; + } + FileUtils.forceMkdirParent(targetFile); - gridFsManager.download(targetFile, GridFsManager.CONTAINER_BUCKET, mongoFileName); + + dFsService.download(new DownloadRequest().setTarget(targetFile).setFileLocation(dfsFL)); }catch (Exception e) { CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile)); ExceptionUtils.rethrow(e); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index 5adfe8d7..a12f05d0 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -10,17 +10,17 @@ import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.server.common.constants.PJThreadPool; +import tech.powerjob.server.extension.dfs.*; +import tech.powerjob.server.persistence.storage.Constants; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.persistence.StringPage; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.local.LocalInstanceLogDO; import tech.powerjob.server.persistence.local.LocalInstanceLogRepository; -import tech.powerjob.server.persistence.mongodb.GridFsManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Queues; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -37,6 +37,7 @@ import javax.annotation.Resource; import java.io.*; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,7 +59,7 @@ public class InstanceLogService { private InstanceMetadataService instanceMetadataService; @Resource - private GridFsManager gridFsManager; + private DFsService dFsService; /** * 本地数据库操作bean */ @@ -214,14 +215,16 @@ public class InstanceLogService { // 先持久化到本地文件 File stableLogFile = genStableLogFile(instanceId); // 将文件推送到 MongoDB - if (gridFsManager.available()) { - try { - gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); - log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop()); - }catch (Exception e) { - log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e); - } + + FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId)); + + try { + dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL)); + log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop()); + }catch (Exception e) { + log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e); } + }catch (Exception e) { log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e); } @@ -291,17 +294,14 @@ public class InstanceLogService { } }else { - if (!gridFsManager.available()) { - OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f); - return f; - } - - // 否则从 mongoDB 拉取数据(对应后期查询的情况) - if (!gridFsManager.exists(GridFsManager.LOG_BUCKET, genMongoFileName(instanceId))) { + FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId)); + Optional dflMetaOpt = dFsService.fetchFileMeta(dfl); + if (!dflMetaOpt.isPresent()) { OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f); return f; } - gridFsManager.download(f, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); + + dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl)); } return f; }catch (Exception e) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java index 06037106..9900fb94 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java @@ -13,10 +13,11 @@ import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.extension.LockService; -import tech.powerjob.server.persistence.mongodb.GridFsManager; +import tech.powerjob.server.extension.dfs.DFsService; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; +import tech.powerjob.server.persistence.storage.Constants; import tech.powerjob.server.remote.worker.WorkerClusterManagerService; import java.io.File; @@ -32,7 +33,7 @@ import java.util.Date; @Service public class CleanService { - private final GridFsManager gridFsManager; + private final DFsService dFsService; private final InstanceInfoRepository instanceInfoRepository; @@ -57,12 +58,12 @@ public class CleanService { private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; - public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, + public CleanService(DFsService dFsService, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService, @Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay, @Value("${oms.container.retention.local}") int localContainerRetentionDay, @Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) { - this.gridFsManager = gridFsManager; + this.dFsService = dFsService; this.instanceInfoRepository = instanceInfoRepository; this.workflowInstanceInfoRepository = workflowInstanceInfoRepository; this.workflowNodeInfoRepository = workflowNodeInfoRepository; @@ -106,8 +107,8 @@ public class CleanService { // 删除无用节点 cleanWorkflowNodeInfo(); // 删除 GridFS 过期文件 - cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); - cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); + cleanRemote(Constants.LOG_BUCKET, instanceInfoRetentionDay); + cleanRemote(Constants.CONTAINER_BUCKET, remoteContainerRetentionDay); } finally { lockService.unlock(HISTORY_DELETE_LOCK); } @@ -152,15 +153,13 @@ public class CleanService { log.info("[CleanService] won't clean up bucket({}) because of offset day <= 0.", bucketName); return; } - if (gridFsManager.available()) { - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - gridFsManager.deleteBefore(bucketName, day); - }catch (Exception e) { - log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e); - } - log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop()); + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + dFsService.cleanExpiredFiles(bucketName, day); + }catch (Exception e) { + log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e); } + log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop()); } @VisibleForTesting diff --git a/powerjob-server/powerjob-server-extension/pom.xml b/powerjob-server/powerjob-server-extension/pom.xml index aa65f78d..89954a8b 100644 --- a/powerjob-server/powerjob-server-extension/pom.xml +++ b/powerjob-server/powerjob-server-extension/pom.xml @@ -19,10 +19,6 @@ - - tech.powerjob - powerjob-server-persistence - \ No newline at end of file diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java index b51e9b52..fdb77b6b 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java @@ -25,13 +25,6 @@ public interface DFsService { */ void download(DownloadRequest downloadRequest) throws IOException; - /** - * 删除文件 - * @param fileLocation 文件位置 - * @throws IOException 异常 - */ - void delete(FileLocation fileLocation) throws IOException; - /** * 获取文件元信息 * @param fileLocation 文件位置 diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java index 70efd4fa..e3288f36 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java @@ -1,6 +1,7 @@ package tech.powerjob.server.extension.dfs; import lombok.Data; +import lombok.experimental.Accessors; import java.io.File; import java.io.Serializable; @@ -12,6 +13,7 @@ import java.io.Serializable; * @since 2023/7/16 */ @Data +@Accessors(chain = true) public class DownloadRequest implements Serializable { private transient File target; diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java index d58a928e..13132876 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java @@ -1,6 +1,7 @@ package tech.powerjob.server.extension.dfs; import lombok.Data; +import lombok.experimental.Accessors; /** * 文件路径 @@ -9,6 +10,7 @@ import lombok.Data; * @since 2023/7/16 */ @Data +@Accessors(chain = true) public class FileLocation { /** diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java index 6e15155b..9fb7656a 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java @@ -1,9 +1,9 @@ package tech.powerjob.server.extension.dfs; import lombok.Data; +import lombok.experimental.Accessors; import java.util.Map; -import java.util.Objects; /** * FileMeta @@ -12,15 +12,16 @@ import java.util.Objects; * @since 2023/7/16 */ @Data +@Accessors(chain = true) public class FileMeta { /** * 文件大小 */ - private final long length; + private long length; /** * 元数据 */ - private Map metaInfo; + private Map metaInfo; } diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java index b07581e3..1933343c 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java @@ -1,6 +1,7 @@ package tech.powerjob.server.extension.dfs; import lombok.Data; +import lombok.experimental.Accessors; import java.io.File; import java.io.Serializable; @@ -12,6 +13,7 @@ import java.io.Serializable; * @since 2023/7/16 */ @Data +@Accessors(chain = true) public class StoreRequest implements Serializable { private transient File localFile; diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index c5494ae9..a1d526b2 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -23,10 +23,19 @@ tech.powerjob powerjob-server-common + + tech.powerjob + powerjob-server-extension + tech.powerjob powerjob-server-monitor + + + org.mongodb + mongodb-driver-sync + \ No newline at end of file diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java deleted file mode 100644 index d275a0fd..00000000 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java +++ /dev/null @@ -1,153 +0,0 @@ -package tech.powerjob.server.persistence.mongodb; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.Maps; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.gridfs.GridFSBucket; -import com.mongodb.client.gridfs.GridFSBuckets; -import com.mongodb.client.gridfs.GridFSDownloadStream; -import com.mongodb.client.gridfs.GridFSFindIterable; -import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.model.Filters; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.time.DateUtils; -import org.bson.conversions.Bson; -import org.bson.types.ObjectId; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.env.Environment; -import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.stereotype.Service; -import tech.powerjob.server.common.PowerJobServerConfigKey; - -import java.io.*; -import java.util.Date; -import java.util.Map; -import java.util.function.Consumer; - -/** - * GridFS 操作助手 - * - * @author tjq - * @since 2020/5/18 - */ -@Slf4j -@Service -public class GridFsManager implements InitializingBean { - - private final Environment environment; - - private final MongoDatabase db; - - private boolean available; - - private final Map bucketCache = Maps.newConcurrentMap(); - - public static final String LOG_BUCKET = "log"; - - public static final String CONTAINER_BUCKET = "container"; - - public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) { - this.environment = environment; - if (mongoTemplate != null) { - this.db = mongoTemplate.getDb(); - } else { - this.db = null; - } - } - - /** - * 是否可用 - * @return true:可用;false:不可用 - */ - public boolean available() { - return available; - } - - /** - * 存储文件到 GridFS - * @param localFile 本地文件 - * @param bucketName 桶名称 - * @param fileName GirdFS中的文件名称 - * @throws IOException 异常 - */ - public void store(File localFile, String bucketName, String fileName) throws IOException { - if (available()) { - GridFSBucket bucket = getBucket(bucketName); - try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) { - bucket.uploadFromStream(fileName, bis); - } - } - } - - /** - * 从 GridFS 下载文件 - * @param targetFile 下载的目标文件(本地文件) - * @param bucketName 桶名称 - * @param fileName GirdFS中的文件名称 - * @throws IOException 异常 - */ - public void download(File targetFile, String bucketName, String fileName) throws IOException { - if (available()) { - GridFSBucket bucket = getBucket(bucketName); - try (GridFSDownloadStream gis = bucket.openDownloadStream(fileName); - BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile)) - ) { - byte[] buffer = new byte[1024]; - int bytes = 0; - while ((bytes = gis.read(buffer)) != -1) { - bos.write(buffer, 0, bytes); - } - bos.flush(); - } - } - } - - /** - * 删除几天前的文件 - * @param bucketName 桶名称 - * @param day 日期偏移量,单位 天 - */ - public void deleteBefore(String bucketName, int day) { - - Stopwatch sw = Stopwatch.createStarted(); - - Date date = DateUtils.addDays(new Date(), -day); - GridFSBucket bucket = getBucket(bucketName); - Bson filter = Filters.lt("uploadDate", date); - - // 循环删除性能很差?我猜你肯定没看过官方实现[狗头]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete - bucket.find(filter).forEach((Consumer) gridFSFile -> { - ObjectId objectId = gridFSFile.getObjectId(); - try { - bucket.delete(objectId); - log.info("[GridFsManager] deleted {}#{}", bucketName, objectId); - }catch (Exception e) { - log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e); - } - }); - log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop()); - } - - public boolean exists(String bucketName, String fileName) { - GridFSBucket bucket = getBucket(bucketName); - GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName)); - try { - GridFSFile first = files.first(); - return first != null; - }catch (Exception ignore) { - } - return false; - } - - private GridFSBucket getBucket(String bucketName) { - return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); - } - - @Override - public void afterPropertiesSet() throws Exception { - String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString()); - available = Boolean.TRUE.toString().equals(enable) && db != null; - log.info("[GridFsManager] available: {}, db: {}", available, db); - } -} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java new file mode 100644 index 00000000..a6d6ec56 --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java @@ -0,0 +1,37 @@ +package tech.powerjob.server.persistence.storage; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.env.Environment; +import tech.powerjob.server.extension.dfs.DFsService; + +import javax.annotation.Resource; + +/** + * AbstractDFsService + * + * @author tjq + * @since 2023/7/28 + */ +public abstract class AbstractDFsService implements DFsService, InitializingBean { + + @Resource + protected Environment environment; + + protected boolean active = false; + + protected static final String PROPERTY_KEY = "oms.storage.dfs"; + + protected boolean active() { + return active; + } + + protected void turnOn() { + active = true; + } + + protected static String fetchProperty(Environment environment, String dfsType, String key) { + String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key); + return environment.getProperty(pKey); + } + +} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/Constants.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/Constants.java new file mode 100644 index 00000000..dd7fcd69 --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/Constants.java @@ -0,0 +1,15 @@ +package tech.powerjob.server.persistence.storage; + +/** + * Constants + * + * @author tjq + * @since 2023/7/30 + */ +public class Constants { + + public static final String LOG_BUCKET = "log"; + + public static final String CONTAINER_BUCKET = "container"; + +} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java new file mode 100644 index 00000000..8ab4d997 --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java @@ -0,0 +1,45 @@ +package tech.powerjob.server.persistence.storage.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; +import tech.powerjob.server.extension.dfs.*; +import tech.powerjob.server.persistence.storage.AbstractDFsService; + +import java.io.IOException; +import java.util.Optional; + +/** + * Alibaba OSS support + * 海量、安全、低成本、高可靠的云存储服务 + * + * @author tjq + * @since 2023/7/30 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.uri"}, matchIfMissing = false) +@ConditionalOnMissingBean(DFsService.class) +public class AliOssService extends AbstractDFsService { + + @Override + public void afterPropertiesSet() throws Exception { + + } + + @Override + public void store(StoreRequest storeRequest) throws IOException { + + } + + @Override + public void download(DownloadRequest downloadRequest) throws IOException { + + } + + @Override + public Optional fetchFileMeta(FileLocation fileLocation) throws IOException { + return Optional.empty(); + } +} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/EmptyDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/EmptyDFsService.java new file mode 100644 index 00000000..2ca193cf --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/EmptyDFsService.java @@ -0,0 +1,38 @@ +package tech.powerjob.server.persistence.storage.impl; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; +import tech.powerjob.server.extension.dfs.*; + +import java.io.IOException; +import java.util.Optional; + +/** + * EmptyDFsService + * + * @author tjq + * @since 2023/7/30 + */ +@Service +@Order(value = Ordered.LOWEST_PRECEDENCE) +@ConditionalOnMissingBean(DFsService.class) +public class EmptyDFsService implements DFsService, { + + public EmptyDFsService() { + } + + @Override + public void store(StoreRequest storeRequest) throws IOException { + } + + @Override + public void download(DownloadRequest downloadRequest) throws IOException { + } + + @Override + public Optional fetchFileMeta(FileLocation fileLocation) throws IOException { + return Optional.empty(); + } +} diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java new file mode 100644 index 00000000..7c1e131d --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java @@ -0,0 +1,141 @@ +package tech.powerjob.server.persistence.storage.impl; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Maps; +import com.mongodb.ConnectionString; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.GridFSDownloadStream; +import com.mongodb.client.gridfs.GridFSFindIterable; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.model.Filters; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateUtils; +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; +import tech.powerjob.server.extension.dfs.*; +import tech.powerjob.server.persistence.storage.AbstractDFsService; + +import java.io.*; +import java.nio.file.Files; +import java.util.Date; +import java.util.Map; +import java.util.Optional; + +/** + * 使用 MongoDB GridFS 作为底层存储 + * 配置用法:oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority + * + * @author tjq + * @since 2023/7/28 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = {"oms.storage.dfs.mongodb.uri", "spring.data.mongodb.uri"}, matchIfMissing = false) +@ConditionalOnMissingBean(DFsService.class) +public class GridFsService extends AbstractDFsService implements InitializingBean { + + private MongoDatabase db; + private final Map bucketCache = Maps.newConcurrentMap(); + private static final String TYPE_MONGO = "mongodb"; + + private static final String SPRING_MONGO_DB_CONFIG_KEY = "spring.data.mongodb.uri"; + + @Override + public void store(StoreRequest storeRequest) throws IOException { + GridFSBucket bucket = getBucket(storeRequest.getFileLocation().getBucket()); + try (BufferedInputStream bis = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))) { + bucket.uploadFromStream(storeRequest.getFileLocation().getName(), bis); + } + } + + @Override + public void download(DownloadRequest downloadRequest) throws IOException { + GridFSBucket bucket = getBucket(downloadRequest.getFileLocation().getBucket()); + try (GridFSDownloadStream gis = bucket.openDownloadStream(downloadRequest.getFileLocation().getName()); + BufferedOutputStream bos = new BufferedOutputStream(Files.newOutputStream(downloadRequest.getTarget().toPath())) + ) { + byte[] buffer = new byte[1024]; + int bytes = 0; + while ((bytes = gis.read(buffer)) != -1) { + bos.write(buffer, 0, bytes); + } + bos.flush(); + } + } + + @Override + public Optional fetchFileMeta(FileLocation fileLocation) throws IOException { + GridFSBucket bucket = getBucket(fileLocation.getBucket()); + GridFSFindIterable files = bucket.find(Filters.eq("filename", fileLocation.getName())); + GridFSFile first = files.first(); + if (first == null) { + return Optional.empty(); + } + return Optional.of(new FileMeta() + .setLength(first.getLength()) + .setMetaInfo(first.getMetadata())); + } + + @Override + public void cleanExpiredFiles(String bucketName, int days) { + Stopwatch sw = Stopwatch.createStarted(); + + Date date = DateUtils.addDays(new Date(), -days); + GridFSBucket bucket = getBucket(bucketName); + Bson filter = Filters.lt("uploadDate", date); + + // 循环删除性能很差?我猜你肯定没看过官方实现[狗头]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete + bucket.find(filter).forEach(gridFSFile -> { + ObjectId objectId = gridFSFile.getObjectId(); + try { + bucket.delete(objectId); + log.info("[GridFsService] deleted {}#{}", bucketName, objectId); + }catch (Exception e) { + log.error("[GridFsService] deleted {}#{} failed.", bucketName, objectId, e); + } + }); + log.info("[GridFsService] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop()); + } + + @Override + public void afterPropertiesSet() throws Exception { + String uri = parseMongoUri(environment); + log.info("[GridFsService] mongoDB uri: {}", uri); + if (StringUtils.isEmpty(uri)) { + log.warn("[GridFsService] uri is empty, GridFsService is off now!"); + return; + } + + ConnectionString connectionString = new ConnectionString(uri); + MongoClient mongoClient = MongoClients.create(connectionString); + db = mongoClient.getDatabase(Optional.ofNullable(connectionString.getDatabase()).orElse("pj")); + + turnOn(); + + log.info("[GridFsService] turn on mongodb GridFs as storage layer."); + } + + private GridFSBucket getBucket(String bucketName) { + return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); + } + + static String parseMongoUri(Environment environment) { + // 优先从新的规则读取 + String uri = fetchProperty(environment, TYPE_MONGO, "uri"); + if (StringUtils.isNotEmpty(uri)) { + return uri; + } + // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置 + return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY); + } +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java index 2cbef095..62e4d320 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java @@ -2,12 +2,11 @@ package tech.powerjob.server.remote.worker; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import tech.powerjob.common.enums.DispatchStrategy; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.server.common.module.WorkerInfo; -import tech.powerjob.server.extension.WorkerFilter; +import tech.powerjob.server.remote.worker.filter.WorkerFilter; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.remote.server.redirector.DesignateServer; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java index 26656ae6..c03a9f9f 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java @@ -6,7 +6,6 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.module.WorkerInfo; -import tech.powerjob.server.extension.WorkerFilter; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import java.util.Set; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java index ff8482d9..5cdfb9a8 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java @@ -1,6 +1,5 @@ package tech.powerjob.server.remote.worker.filter; -import tech.powerjob.server.extension.WorkerFilter; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.common.module.WorkerInfo; import lombok.extern.slf4j.Slf4j; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java index f88bbdd5..2270189a 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java @@ -1,7 +1,6 @@ package tech.powerjob.server.remote.worker.filter; import tech.powerjob.common.model.SystemMetrics; -import tech.powerjob.server.extension.WorkerFilter; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.common.module.WorkerInfo; import lombok.extern.slf4j.Slf4j; diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/WorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java similarity index 91% rename from powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/WorkerFilter.java rename to powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java index 5df4214d..35e41586 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/WorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java @@ -1,4 +1,4 @@ -package tech.powerjob.server.extension; +package tech.powerjob.server.remote.worker.filter; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.common.module.WorkerInfo; diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/GridFsTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/GridFsTest.java deleted file mode 100644 index 729dc2f2..00000000 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/GridFsTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package tech.powerjob.server.test; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import tech.powerjob.server.persistence.mongodb.GridFsManager; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringRunner; - -import javax.annotation.Resource; -import java.io.File; -import java.io.IOException; - -/** - * GridFS 测试 - * - * @author tjq - * @since 2020/5/18 - */ -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -public class GridFsTest { - - @Resource - private GridFsManager gridFsManager; - - @Test - @Disabled - public void testStore() throws IOException { - /** - File file = new File("/Users/tjq/Desktop/DistributeCompute/oms-template-origin.zip"); - gridFsManager.store(file, "test", "test.zip"); - **/ - } - - @Test - @Disabled - public void testDownload() throws IOException { - /** - File file = new File("/Users/tjq/Desktop/tmp/test-download.zip"); - gridFsManager.download(file, "test", "test.zip"); - **/ - } - - @Test - @Disabled - public void testDelete() { - /** - gridFsManager.deleteBefore("fs", 0); - **/ - } - - @Test - @Disabled - public void testExists() { - /** - System.out.println(gridFsManager.exists("test", "test.zip")); - System.out.println(gridFsManager.exists("test", "oms-sql.sql")); - **/ - } - -} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/OmsLogTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/OmsLogTest.java deleted file mode 100644 index 2fa7a093..00000000 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/OmsLogTest.java +++ /dev/null @@ -1,56 +0,0 @@ -package tech.powerjob.server.test; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import tech.powerjob.server.common.utils.OmsFileUtils; -import tech.powerjob.server.persistence.mongodb.GridFsManager; -import tech.powerjob.server.core.scheduler.CleanService; -import com.mongodb.client.gridfs.model.GridFSFile; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.data.mongodb.core.query.Criteria; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.gridfs.GridFsTemplate; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringRunner; - -import javax.annotation.Resource; -import java.util.Date; -import java.util.function.Consumer; - -/** - * 在线日志测试 - * - * @author tjq - * @since 2020/5/11 - */ - -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -@Disabled -public class OmsLogTest { - - @Resource - private CleanService cleanService; - @Resource - private GridFsTemplate gridFsTemplate; - - @Test - public void testLocalLogCleaner() { - cleanService.cleanLocal(OmsFileUtils.genLogDirPath(), 0); - } - - @Test - public void testRemoteLogCleaner() { - cleanService.cleanRemote(GridFsManager.LOG_BUCKET, 0); - } - - @Test - public void testGridFsQuery() { - Query mongoQuery = Query.query(Criteria.where("uploadDate").gt(new Date())); - gridFsTemplate.find(mongoQuery).forEach(new Consumer() { - @Override - public void accept(GridFSFile gridFSFile) { - System.out.println(gridFSFile.getFilename()); - } - }); - } -}