mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: [storageExt] support alicloud oss
This commit is contained in:
parent
236d0a7f3b
commit
f0514ac65f
@ -54,6 +54,7 @@
|
||||
<powerjob-remote-impl-http.version>4.3.3</powerjob-remote-impl-http.version>
|
||||
<powerjob-remote-impl-akka.version>4.3.3</powerjob-remote-impl-akka.version>
|
||||
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
|
||||
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
@ -99,12 +100,18 @@
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
|
||||
<!-- 存储扩展-MongoDB,未使用可移除 -->
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
<version>${mongodb-driver-sync.version}</version>
|
||||
</dependency>
|
||||
<!-- 存储扩展-阿里云OSS,未使用可移除 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun.oss</groupId>
|
||||
<artifactId>aliyun-sdk-oss</artifactId>
|
||||
<version>${aliyun-sdk-oss.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
@ -3,6 +3,7 @@ package tech.powerjob.server.extension.dfs;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -19,6 +20,10 @@ public class FileMeta {
|
||||
* 文件大小
|
||||
*/
|
||||
private long length;
|
||||
/**
|
||||
* 最后修改时间
|
||||
*/
|
||||
private Date lastModifiedTime;
|
||||
|
||||
/**
|
||||
* 元数据
|
||||
|
@ -36,6 +36,10 @@
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aliyun.oss</groupId>
|
||||
<artifactId>aliyun-sdk-oss</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -29,7 +29,7 @@ public abstract class AbstractDFsService implements DFsService, InitializingBean
|
||||
active = true;
|
||||
}
|
||||
|
||||
protected static String fetchProperty(Environment environment, String dfsType, String key) {
|
||||
protected String fetchProperty(String dfsType, String key) {
|
||||
String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
|
||||
return environment.getProperty(pKey);
|
||||
}
|
||||
|
@ -1,6 +1,17 @@
|
||||
package tech.powerjob.server.persistence.storage.impl;
|
||||
|
||||
import com.aliyun.oss.*;
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
|
||||
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
|
||||
import com.aliyun.oss.model.DownloadFileRequest;
|
||||
import com.aliyun.oss.model.ObjectMetadata;
|
||||
import com.aliyun.oss.model.PutObjectRequest;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -8,38 +19,152 @@ import tech.powerjob.server.extension.dfs.*;
|
||||
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Alibaba OSS support
|
||||
* <a href="https://www.aliyun.com/product/oss">海量、安全、低成本、高可靠的云存储服务</a>
|
||||
* 配置项:
|
||||
* oms.storage.dfs.alioss.endpoint
|
||||
* oms.storage.dfs.alioss.credential_type
|
||||
* oms.storage.dfs.alioss.ak
|
||||
* oms.storage.dfs.alioss.sk
|
||||
* oms.storage.dfs.alioss.token
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.uri"}, matchIfMissing = false)
|
||||
@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.endpoint"}, matchIfMissing = false)
|
||||
@ConditionalOnMissingBean(DFsService.class)
|
||||
public class AliOssService extends AbstractDFsService {
|
||||
|
||||
private static final String TYPE_ALI_OSS = "alioss";
|
||||
|
||||
private static final String KEY_ENDPOINT = "endpoint";
|
||||
private static final String KEY_CREDENTIAL_TYPE = "credential_type";
|
||||
private static final String KEY_AK = "ak";
|
||||
private static final String KEY_SK = "sk";
|
||||
private static final String KEY_TOKEN = "token";
|
||||
|
||||
private OSS oss;
|
||||
|
||||
private static final int DOWNLOAD_PART_SIZE = 10240;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
||||
String endpoint = fetchProperty(TYPE_ALI_OSS, KEY_ENDPOINT);
|
||||
String ct = fetchProperty(TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
|
||||
String ak = fetchProperty(TYPE_ALI_OSS, KEY_AK);
|
||||
String sk = fetchProperty(TYPE_ALI_OSS, KEY_SK);
|
||||
String token = fetchProperty(TYPE_ALI_OSS, KEY_TOKEN);
|
||||
|
||||
initOssClient(endpoint, ct, ak, sk, token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(StoreRequest storeRequest) throws IOException {
|
||||
|
||||
FileLocation dfl = storeRequest.getFileLocation();
|
||||
ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||
|
||||
PutObjectRequest putObjectRequest = new PutObjectRequest(dfl.getBucket(), dfl.getName(), storeRequest.getLocalFile(), objectMetadata);
|
||||
oss.putObject(putObjectRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void download(DownloadRequest downloadRequest) throws IOException {
|
||||
|
||||
FileLocation dfl = downloadRequest.getFileLocation();
|
||||
DownloadFileRequest downloadFileRequest = new DownloadFileRequest(dfl.getBucket(), dfl.getName(), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
|
||||
try {
|
||||
oss.downloadFile(downloadFileRequest);
|
||||
} catch (Throwable t) {
|
||||
ExceptionUtils.rethrow(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
|
||||
try {
|
||||
ObjectMetadata objectMetadata = oss.getObjectMetadata(fileLocation.getBucket(), fileLocation.getName());
|
||||
return Optional.ofNullable(objectMetadata).map(ossM -> {
|
||||
|
||||
Map<String, Object> metaInfo = Maps.newHashMap();
|
||||
metaInfo.putAll(ossM.getRawMetadata());
|
||||
if (ossM.getUserMetadata() != null) {
|
||||
metaInfo.putAll(ossM.getUserMetadata());
|
||||
}
|
||||
|
||||
return new FileMeta()
|
||||
.setLastModifiedTime(ossM.getLastModified())
|
||||
.setLength(ossM.getContentLength())
|
||||
.setMetaInfo(metaInfo);
|
||||
});
|
||||
} catch (OSSException oe) {
|
||||
// TODO: 判断文件不存在
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
void initOssClient(String endpoint, String mode, String ak, String sk, String token) throws Exception {
|
||||
|
||||
log.info("[AliOssService] init OSS by config: {},{},{},{},{}", endpoint, mode, ak, sk, token);
|
||||
|
||||
CredentialsProvider credentialsProvider;
|
||||
CredentialType credentialType = CredentialType.parse(mode);
|
||||
switch (credentialType) {
|
||||
case PWD:
|
||||
credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
|
||||
break;
|
||||
case SYSTEM_PROPERTY:
|
||||
credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
|
||||
break;
|
||||
default:
|
||||
credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
|
||||
}
|
||||
|
||||
this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
|
||||
log.info("[AliOssService] initialize OSS successfully!");
|
||||
}
|
||||
|
||||
|
||||
@AllArgsConstructor
|
||||
enum CredentialType {
|
||||
/**
|
||||
* 从环境读取
|
||||
*/
|
||||
ENV("env"),
|
||||
/**
|
||||
* 系统配置
|
||||
*/
|
||||
SYSTEM_PROPERTY("sys"),
|
||||
/**
|
||||
* 从账号密码读取
|
||||
*/
|
||||
PWD("pwd")
|
||||
;
|
||||
|
||||
private final String code;
|
||||
|
||||
/**
|
||||
* parse credential type
|
||||
* @param mode oms.storage.dfs.alioss.credential_type
|
||||
* @return CredentialType
|
||||
*/
|
||||
public static CredentialType parse(String mode) {
|
||||
|
||||
for (CredentialType credentialType : values()) {
|
||||
if (StringUtils.equalsIgnoreCase(credentialType.code, mode)) {
|
||||
return credentialType;
|
||||
}
|
||||
}
|
||||
|
||||
return PWD;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -83,6 +83,7 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
|
||||
}
|
||||
return Optional.of(new FileMeta()
|
||||
.setLength(first.getLength())
|
||||
.setLastModifiedTime(first.getUploadDate())
|
||||
.setMetaInfo(first.getMetadata()));
|
||||
}
|
||||
|
||||
@ -109,7 +110,25 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
String uri = parseMongoUri(environment);
|
||||
String uri = parseMongoUri();
|
||||
initMongo(uri);
|
||||
}
|
||||
|
||||
private GridFSBucket getBucket(String bucketName) {
|
||||
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
|
||||
}
|
||||
|
||||
private String parseMongoUri() {
|
||||
// 优先从新的规则读取
|
||||
String uri = fetchProperty(TYPE_MONGO, "uri");
|
||||
if (StringUtils.isNotEmpty(uri)) {
|
||||
return uri;
|
||||
}
|
||||
// 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置
|
||||
return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
|
||||
}
|
||||
|
||||
private void initMongo(String uri) {
|
||||
log.info("[GridFsService] mongoDB uri: {}", uri);
|
||||
if (StringUtils.isEmpty(uri)) {
|
||||
log.warn("[GridFsService] uri is empty, GridFsService is off now!");
|
||||
@ -124,18 +143,4 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
|
||||
|
||||
log.info("[GridFsService] turn on mongodb GridFs as storage layer.");
|
||||
}
|
||||
|
||||
private GridFSBucket getBucket(String bucketName) {
|
||||
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
|
||||
}
|
||||
|
||||
static String parseMongoUri(Environment environment) {
|
||||
// 优先从新的规则读取
|
||||
String uri = fetchProperty(environment, TYPE_MONGO, "uri");
|
||||
if (StringUtils.isNotEmpty(uri)) {
|
||||
return uri;
|
||||
}
|
||||
// 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置
|
||||
return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user