From f0514ac65f7df34bd9be3d06dd3f17627eb452a7 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 30 Jul 2023 14:39:57 +0800 Subject: [PATCH] feat: [storageExt] support alicloud oss --- powerjob-server/pom.xml | 9 +- .../server/extension/dfs/FileMeta.java | 5 + .../powerjob-server-persistence/pom.xml | 4 + .../storage/AbstractDFsService.java | 2 +- .../storage/impl/AliOssService.java | 127 +++++++++++++++++- .../storage/impl/GridFsService.java | 35 ++--- 6 files changed, 164 insertions(+), 18 deletions(-) diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 0aac969f..6c762e93 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -54,6 +54,7 @@ 4.3.3 4.3.3 1.6.14 + 3.17.1 @@ -99,12 +100,18 @@ ${project.version} - + org.mongodb mongodb-driver-sync ${mongodb-driver-sync.version} + + + com.aliyun.oss + aliyun-sdk-oss + ${aliyun-sdk-oss.version} + diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java index 9fb7656a..077701aa 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java @@ -3,6 +3,7 @@ package tech.powerjob.server.extension.dfs; import lombok.Data; import lombok.experimental.Accessors; +import java.util.Date; import java.util.Map; /** @@ -19,6 +20,10 @@ public class FileMeta { * 文件大小 */ private long length; + /** + * 最后修改时间 + */ + private Date lastModifiedTime; /** * 元数据 diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index a1d526b2..d476400b 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -36,6 +36,10 @@ org.mongodb mongodb-driver-sync + + com.aliyun.oss + aliyun-sdk-oss + \ No newline at end of file diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java index a6d6ec56..99b2c72a 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java @@ -29,7 +29,7 @@ public abstract class AbstractDFsService implements DFsService, InitializingBean active = true; } - protected static String fetchProperty(Environment environment, String dfsType, String key) { + protected String fetchProperty(String dfsType, String key) { String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key); return environment.getProperty(pKey); } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java index 8ab4d997..321dc8b6 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java @@ -1,6 +1,17 @@ package tech.powerjob.server.persistence.storage.impl; +import com.aliyun.oss.*; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.model.DownloadFileRequest; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.PutObjectRequest; +import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; @@ -8,38 +19,152 @@ import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.storage.AbstractDFsService; import java.io.IOException; +import java.util.Map; import java.util.Optional; /** * Alibaba OSS support * 海量、安全、低成本、高可靠的云存储服务 + * 配置项: + * oms.storage.dfs.alioss.endpoint + * oms.storage.dfs.alioss.credential_type + * oms.storage.dfs.alioss.ak + * oms.storage.dfs.alioss.sk + * oms.storage.dfs.alioss.token * * @author tjq * @since 2023/7/30 */ @Slf4j @Service -@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.uri"}, matchIfMissing = false) +@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.endpoint"}, matchIfMissing = false) @ConditionalOnMissingBean(DFsService.class) public class AliOssService extends AbstractDFsService { + private static final String TYPE_ALI_OSS = "alioss"; + + private static final String KEY_ENDPOINT = "endpoint"; + private static final String KEY_CREDENTIAL_TYPE = "credential_type"; + private static final String KEY_AK = "ak"; + private static final String KEY_SK = "sk"; + private static final String KEY_TOKEN = "token"; + + private OSS oss; + + private static final int DOWNLOAD_PART_SIZE = 10240; + @Override public void afterPropertiesSet() throws Exception { + String endpoint = fetchProperty(TYPE_ALI_OSS, KEY_ENDPOINT); + String ct = fetchProperty(TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE); + String ak = fetchProperty(TYPE_ALI_OSS, KEY_AK); + String sk = fetchProperty(TYPE_ALI_OSS, KEY_SK); + String token = fetchProperty(TYPE_ALI_OSS, KEY_TOKEN); + + initOssClient(endpoint, ct, ak, sk, token); } @Override public void store(StoreRequest storeRequest) throws IOException { + FileLocation dfl = storeRequest.getFileLocation(); + ObjectMetadata objectMetadata = new ObjectMetadata(); + + PutObjectRequest putObjectRequest = new PutObjectRequest(dfl.getBucket(), dfl.getName(), storeRequest.getLocalFile(), objectMetadata); + oss.putObject(putObjectRequest); } @Override public void download(DownloadRequest downloadRequest) throws IOException { + FileLocation dfl = downloadRequest.getFileLocation(); + DownloadFileRequest downloadFileRequest = new DownloadFileRequest(dfl.getBucket(), dfl.getName(), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE); + try { + oss.downloadFile(downloadFileRequest); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } } @Override public Optional fetchFileMeta(FileLocation fileLocation) throws IOException { + try { + ObjectMetadata objectMetadata = oss.getObjectMetadata(fileLocation.getBucket(), fileLocation.getName()); + return Optional.ofNullable(objectMetadata).map(ossM -> { + + Map metaInfo = Maps.newHashMap(); + metaInfo.putAll(ossM.getRawMetadata()); + if (ossM.getUserMetadata() != null) { + metaInfo.putAll(ossM.getUserMetadata()); + } + + return new FileMeta() + .setLastModifiedTime(ossM.getLastModified()) + .setLength(ossM.getContentLength()) + .setMetaInfo(metaInfo); + }); + } catch (OSSException oe) { + // TODO: 判断文件不存在 + } return Optional.empty(); } + + void initOssClient(String endpoint, String mode, String ak, String sk, String token) throws Exception { + + log.info("[AliOssService] init OSS by config: {},{},{},{},{}", endpoint, mode, ak, sk, token); + + CredentialsProvider credentialsProvider; + CredentialType credentialType = CredentialType.parse(mode); + switch (credentialType) { + case PWD: + credentialsProvider = new DefaultCredentialProvider(ak, sk, token); + break; + case SYSTEM_PROPERTY: + credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider(); + break; + default: + credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider(); + } + + this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider); + log.info("[AliOssService] initialize OSS successfully!"); + } + + + @AllArgsConstructor + enum CredentialType { + /** + * 从环境读取 + */ + ENV("env"), + /** + * 系统配置 + */ + SYSTEM_PROPERTY("sys"), + /** + * 从账号密码读取 + */ + PWD("pwd") + ; + + private final String code; + + /** + * parse credential type + * @param mode oms.storage.dfs.alioss.credential_type + * @return CredentialType + */ + public static CredentialType parse(String mode) { + + for (CredentialType credentialType : values()) { + if (StringUtils.equalsIgnoreCase(credentialType.code, mode)) { + return credentialType; + } + } + + return PWD; + } + + } } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java index 7c1e131d..68e973f8 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java @@ -83,6 +83,7 @@ public class GridFsService extends AbstractDFsService implements InitializingBea } return Optional.of(new FileMeta() .setLength(first.getLength()) + .setLastModifiedTime(first.getUploadDate()) .setMetaInfo(first.getMetadata())); } @@ -109,7 +110,25 @@ public class GridFsService extends AbstractDFsService implements InitializingBea @Override public void afterPropertiesSet() throws Exception { - String uri = parseMongoUri(environment); + String uri = parseMongoUri(); + initMongo(uri); + } + + private GridFSBucket getBucket(String bucketName) { + return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); + } + + private String parseMongoUri() { + // 优先从新的规则读取 + String uri = fetchProperty(TYPE_MONGO, "uri"); + if (StringUtils.isNotEmpty(uri)) { + return uri; + } + // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置 + return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY); + } + + private void initMongo(String uri) { log.info("[GridFsService] mongoDB uri: {}", uri); if (StringUtils.isEmpty(uri)) { log.warn("[GridFsService] uri is empty, GridFsService is off now!"); @@ -124,18 +143,4 @@ public class GridFsService extends AbstractDFsService implements InitializingBea log.info("[GridFsService] turn on mongodb GridFs as storage layer."); } - - private GridFSBucket getBucket(String bucketName) { - return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); - } - - static String parseMongoUri(Environment environment) { - // 优先从新的规则读取 - String uri = fetchProperty(environment, TYPE_MONGO, "uri"); - if (StringUtils.isNotEmpty(uri)) { - return uri; - } - // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置 - return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY); - } }