diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java index 7a06569f..1eeb5e0e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java @@ -1,10 +1,7 @@ package com.github.kfcfans.oms.server.common.utils; -import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.springframework.data.mongodb.gridfs.GridFsResource; -import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.util.DigestUtils; import javax.servlet.http.HttpServletResponse; @@ -82,40 +79,6 @@ public class OmsFileUtils { } } - /** - * 将 mongoDB 中的数据转存到本地文件中 - * @param gridFsResource mongoDB 文件资源 - * @param targetFile 本地文件资源 - */ - public static void gridFs2File(GridFsResource gridFsResource, File targetFile) { - - byte[] buffer = new byte[1024]; - try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); - BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile)) - ) { - while (gis.read(buffer) != -1) { - bos.write(buffer); - } - bos.flush(); - }catch (IOException ie) { - ExceptionUtils.rethrow(ie); - } - } - - /** - * 将文件保存到 GridFS - * @param gridFsTemplate gridFS操作模版 - * @param localFile 本地文件 - * @param remoteName 存储名称 - * @param metadata 元数据 - * @throws IOException 异常 - */ - public static void storeFile2GridFS(GridFsTemplate gridFsTemplate, File localFile, String remoteName, Object metadata) throws IOException { - try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) { - gridFsTemplate.store(bis, remoteName, metadata); - } - } - /** * 计算文件的 MD5 * @param f 文件 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/GridFsHelper.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/GridFsManager.java similarity index 84% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/GridFsHelper.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/GridFsManager.java index 4f91298b..faaf6902 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/GridFsHelper.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/GridFsManager.java @@ -6,8 +6,8 @@ import com.mongodb.client.MongoDatabase; import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.gridfs.GridFSBuckets; import com.mongodb.client.gridfs.GridFSDownloadStream; +import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.gridfs.model.GridFSUploadOptions; import com.mongodb.client.model.Filters; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateUtils; @@ -30,16 +30,28 @@ import java.util.function.Consumer; */ @Slf4j @Service -public class GridFsHelper { +public class GridFsManager { private MongoDatabase db; + private Map bucketCache = Maps.newConcurrentMap(); + public static final String LOG_BUCKET = "log"; + public static final String CONTAINER_BUCKET = "container"; + @Autowired(required = false) public void setMongoTemplate(MongoTemplate mongoTemplate) { this.db = mongoTemplate.getDb(); } + /** + * 是否可用 + * @return true:可用;false:不可用 + */ + public boolean available() { + return db != null; + } + /** * 存储文件到 GridFS * @param localFile 本地文件 @@ -106,6 +118,17 @@ public class GridFsHelper { log.info("[GridFsHelper] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop()); } + public boolean exists(String bucketName, String fileName) { + GridFSBucket bucket = getBucket(bucketName); + GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName)); + try { + GridFSFile first = files.first(); + return first != null; + }catch (Exception ignore) { + } + return false; + } + private GridFSBucket getBucket(String bucketName) { return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogMetadata.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogMetadata.java deleted file mode 100644 index 5f6383e4..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogMetadata.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.github.kfcfans.oms.server.persistence.mongodb; - -import lombok.Data; - -/** - * 任务日志元数据 - * - * @author tjq - * @since 2020/5/3 - */ -@Data -public class InstanceLogMetadata { - - /** - * 任务实例ID - */ - private long instanceId; - /** - * 文件大小 - */ - private long fileSize; - /** - * 创建时间(用于条件删除) - */ - private long createdTime; - -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java index 25a20abc..baed9dd2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java @@ -2,20 +2,17 @@ package com.github.kfcfans.oms.server.service; import akka.actor.ActorSelection; import com.github.kfcfans.oms.common.model.GitRepoInfo; -import com.github.kfcfans.oms.common.model.SystemMetrics; import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; import com.github.kfcfans.oms.common.request.ServerDestroyContainerRequest; import com.github.kfcfans.oms.common.utils.CommonUtils; import com.github.kfcfans.oms.common.utils.JsonUtils; import com.github.kfcfans.oms.common.utils.NetUtils; import com.github.kfcfans.oms.server.akka.OhMyServer; -import com.github.kfcfans.oms.server.akka.actors.ServerActor; import com.github.kfcfans.oms.server.common.constans.ContainerSourceType; -import com.github.kfcfans.oms.server.common.constans.ContainerStatus; import com.github.kfcfans.oms.server.common.utils.OmsFileUtils; import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository; -import com.github.kfcfans.oms.server.service.ha.ClusterStatusHolder; +import com.github.kfcfans.oms.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import com.github.kfcfans.oms.server.service.lock.LockService; import com.github.kfcfans.oms.server.web.request.SaveContainerInfoRequest; @@ -24,22 +21,17 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.IOFileFilter; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.DateFormatUtils; -import org.apache.commons.lang3.time.DateUtils; import org.apache.maven.shared.invoker.*; import org.eclipse.jgit.api.CloneCommand; import org.eclipse.jgit.api.Git; -import org.eclipse.jgit.lib.AnyObjectId; -import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.transport.CredentialsProvider; import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; -import org.springframework.data.mongodb.gridfs.GridFsResource; -import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -69,8 +61,8 @@ public class ContainerService { private LockService lockService; @Resource private ContainerInfoRepository containerInfoRepository; - - private GridFsTemplate gridFsTemplate; + @Resource + private GridFsManager gridFsManager; // 并发部署的机器数量 private static final int DEPLOY_BATCH_NUM = 50; @@ -154,9 +146,7 @@ public class ContainerService { String fileName = genContainerJarName(md5); // 上传到 mongoDB - if (gridFsTemplate != null) { - OmsFileUtils.storeFile2GridFS(gridFsTemplate, tmpFile, fileName, null); - } + gridFsManager.store(tmpFile, GridFsManager.CONTAINER_BUCKET, fileName); // 将文件拷贝到正确的路径 String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName; @@ -185,7 +175,7 @@ public class ContainerService { if (localFile.exists()) { return localFile; } - if (gridFsTemplate != null) { + if (gridFsManager.available()) { downloadJarFromGridFS(fileName, localFile); } return localFile; @@ -336,11 +326,10 @@ public class ContainerService { File jarWithDependency = jarFile.iterator().next(); String jarFileName = genContainerJarName(container.getVersion()); - GridFsResource resource = gridFsTemplate.getResource(jarFileName); - if (!resource.exists()) { + if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) { remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version."); - OmsFileUtils.storeFile2GridFS(gridFsTemplate, jarWithDependency, jarFileName, null); + gridFsManager.download(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName); remote.sendText("SYSTEM: upload to GridFS successfully~"); } @@ -369,32 +358,26 @@ public class ContainerService { } // 从 MongoDB 下载 - GridFsResource resource = gridFsTemplate.getResource(jarFileName); - if (!resource.exists()) { - remote.sendText(String.format("SYSTEM: can't find %s in local disk and GridFS, deploy failed!", jarFileName)); - return null; - } - remote.sendText("SYSTEM: start to download jar file from GridFS......"); - OmsFileUtils.gridFs2File(resource, localFile); + remote.sendText(String.format("SYSTEM: try to find the jarFile(%s) in GridFS", jarFileName)); + downloadJarFromGridFS(jarFileName, localFile); remote.sendText("SYSTEM: download jar file from GridFS successfully~"); return localFile; } private void downloadJarFromGridFS(String mongoFileName, File targetFile) { - synchronized (mongoFileName.intern()) { + synchronized (("dlLock-" + mongoFileName).intern()) { if (targetFile.exists()) { return; } - GridFsResource gridFsResource = gridFsTemplate.getResource(mongoFileName); - if (!gridFsResource.exists()) { + if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, mongoFileName)) { log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName); return; } try { - OmsFileUtils.gridFs2File(gridFsResource, targetFile); + gridFsManager.download(targetFile, GridFsManager.CONTAINER_BUCKET, mongoFileName); }catch (Exception e) { CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile)); - throw e; + ExceptionUtils.rethrow(e); } } } @@ -412,9 +395,4 @@ public class ContainerService { return (fileLength / FileUtils.ONE_MB / 10 + 1) * 1000; } - - @Autowired(required = false) - public void setGridFsTemplate(GridFsTemplate gridFsTemplate) { - this.gridFsTemplate = gridFsTemplate; - } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java index 08d588f8..0b3d25fe 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java @@ -8,7 +8,7 @@ import com.github.kfcfans.oms.server.persistence.StringPage; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository; -import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata; +import com.github.kfcfans.oms.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; @@ -19,9 +19,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.mongodb.gridfs.GridFsResource; -import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -46,9 +43,8 @@ import java.util.stream.Stream; @Service public class InstanceLogService { - // 直接操作 mongoDB 文件系统 - private GridFsTemplate gridFsTemplate; - + @Resource + private GridFsManager gridFsManager; // 本地数据库操作bean @Resource(name = "localTransactionTemplate") private TransactionTemplate localTransactionTemplate; @@ -181,14 +177,9 @@ public class InstanceLogService { // 先持久化到本地文件 File stableLogFile = genStableLogFile(instanceId); // 将文件推送到 MongoDB - if (gridFsTemplate != null) { + if (gridFsManager.available()) { try { - InstanceLogMetadata metadata = new InstanceLogMetadata(); - metadata.setInstanceId(instanceId); - metadata.setFileSize(stableLogFile.length()); - metadata.setCreatedTime(System.currentTimeMillis()); - - OmsFileUtils.storeFile2GridFS(gridFsTemplate, stableLogFile, genMongoFileName(instanceId), metadata); + gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop()); }catch (Exception e) { log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); @@ -255,19 +246,17 @@ public class InstanceLogService { } }else { - if (gridFsTemplate == null) { + if (!gridFsManager.available()) { OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f); return f; } // 否则从 mongoDB 拉取数据(对应后期查询的情况) - GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId)); - - if (!gridFsResource.exists()) { + if (!gridFsManager.exists(GridFsManager.LOG_BUCKET, genMongoFileName(instanceId))) { OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f); return f; } - OmsFileUtils.gridFs2File(gridFsResource, f); + gridFsManager.download(f, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); } return f; }catch (Exception e) { @@ -352,8 +341,4 @@ public class InstanceLogService { return String.format("oms-%d.log", instanceId); } - @Autowired(required = false) - public void setGridFsTemplate(GridFsTemplate gridFsTemplate) { - this.gridFsTemplate = gridFsTemplate; - } } diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/GridFsTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/GridFsTest.java index ca16885d..18918db0 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/GridFsTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/GridFsTest.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.server.test; -import com.github.kfcfans.oms.server.persistence.mongodb.GridFsHelper; +import com.github.kfcfans.oms.server.persistence.mongodb.GridFsManager; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; @@ -23,22 +23,28 @@ import java.io.IOException; public class GridFsTest { @Resource - private GridFsHelper gridFsHelper; + private GridFsManager gridFsManager; @Test public void testStore() throws IOException { File file = new File("/Users/tjq/Desktop/DistributeCompute/oms-template-origin.zip"); - gridFsHelper.store(file, "test", "test.zip"); + gridFsManager.store(file, "test", "test.zip"); } @Test public void testDownload() throws IOException { File file = new File("/Users/tjq/Desktop/tmp/test-download.zip"); - gridFsHelper.download(file, "test", "test.zip"); + gridFsManager.download(file, "test", "test.zip"); } @Test public void testDelete() { - gridFsHelper.deleteBefore("fs", 0); + gridFsManager.deleteBefore("fs", 0); + } + + @Test + public void testExists() { + System.out.println(gridFsManager.exists("test", "test.zip")); + System.out.println(gridFsManager.exists("test", "oms-sql.sql")); } }