diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index 0aac969f..6c762e93 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -54,6 +54,7 @@
4.3.3
4.3.3
1.6.14
+ 3.17.1
@@ -99,12 +100,18 @@
${project.version}
-
+
org.mongodb
mongodb-driver-sync
${mongodb-driver-sync.version}
+
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+ ${aliyun-sdk-oss.version}
+
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java
index 9fb7656a..077701aa 100644
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java
@@ -3,6 +3,7 @@ package tech.powerjob.server.extension.dfs;
import lombok.Data;
import lombok.experimental.Accessors;
+import java.util.Date;
import java.util.Map;
/**
@@ -19,6 +20,10 @@ public class FileMeta {
* 文件大小
*/
private long length;
+ /**
+ * 最后修改时间
+ */
+ private Date lastModifiedTime;
/**
* 元数据
diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml
index a1d526b2..d476400b 100644
--- a/powerjob-server/powerjob-server-persistence/pom.xml
+++ b/powerjob-server/powerjob-server-persistence/pom.xml
@@ -36,6 +36,10 @@
org.mongodb
mongodb-driver-sync
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+
\ No newline at end of file
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java
index a6d6ec56..99b2c72a 100644
--- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java
@@ -29,7 +29,7 @@ public abstract class AbstractDFsService implements DFsService, InitializingBean
active = true;
}
- protected static String fetchProperty(Environment environment, String dfsType, String key) {
+ protected String fetchProperty(String dfsType, String key) {
String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
return environment.getProperty(pKey);
}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java
index 8ab4d997..321dc8b6 100644
--- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java
@@ -1,6 +1,17 @@
package tech.powerjob.server.persistence.storage.impl;
+import com.aliyun.oss.*;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.CredentialsProviderFactory;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.model.DownloadFileRequest;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.google.common.collect.Maps;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@@ -8,38 +19,152 @@ import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import java.io.IOException;
+import java.util.Map;
import java.util.Optional;
/**
* Alibaba OSS support
* 海量、安全、低成本、高可靠的云存储服务
+ * 配置项:
+ * oms.storage.dfs.alioss.endpoint
+ * oms.storage.dfs.alioss.credential_type
+ * oms.storage.dfs.alioss.ak
+ * oms.storage.dfs.alioss.sk
+ * oms.storage.dfs.alioss.token
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
@Service
-@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.uri"}, matchIfMissing = false)
+@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.endpoint"}, matchIfMissing = false)
@ConditionalOnMissingBean(DFsService.class)
public class AliOssService extends AbstractDFsService {
+ private static final String TYPE_ALI_OSS = "alioss";
+
+ private static final String KEY_ENDPOINT = "endpoint";
+ private static final String KEY_CREDENTIAL_TYPE = "credential_type";
+ private static final String KEY_AK = "ak";
+ private static final String KEY_SK = "sk";
+ private static final String KEY_TOKEN = "token";
+
+ private OSS oss;
+
+ private static final int DOWNLOAD_PART_SIZE = 10240;
+
@Override
public void afterPropertiesSet() throws Exception {
+ String endpoint = fetchProperty(TYPE_ALI_OSS, KEY_ENDPOINT);
+ String ct = fetchProperty(TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
+ String ak = fetchProperty(TYPE_ALI_OSS, KEY_AK);
+ String sk = fetchProperty(TYPE_ALI_OSS, KEY_SK);
+ String token = fetchProperty(TYPE_ALI_OSS, KEY_TOKEN);
+
+ initOssClient(endpoint, ct, ak, sk, token);
}
@Override
public void store(StoreRequest storeRequest) throws IOException {
+ FileLocation dfl = storeRequest.getFileLocation();
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+
+ PutObjectRequest putObjectRequest = new PutObjectRequest(dfl.getBucket(), dfl.getName(), storeRequest.getLocalFile(), objectMetadata);
+ oss.putObject(putObjectRequest);
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
+ FileLocation dfl = downloadRequest.getFileLocation();
+ DownloadFileRequest downloadFileRequest = new DownloadFileRequest(dfl.getBucket(), dfl.getName(), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
+ try {
+ oss.downloadFile(downloadFileRequest);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
}
@Override
public Optional fetchFileMeta(FileLocation fileLocation) throws IOException {
+ try {
+ ObjectMetadata objectMetadata = oss.getObjectMetadata(fileLocation.getBucket(), fileLocation.getName());
+ return Optional.ofNullable(objectMetadata).map(ossM -> {
+
+ Map metaInfo = Maps.newHashMap();
+ metaInfo.putAll(ossM.getRawMetadata());
+ if (ossM.getUserMetadata() != null) {
+ metaInfo.putAll(ossM.getUserMetadata());
+ }
+
+ return new FileMeta()
+ .setLastModifiedTime(ossM.getLastModified())
+ .setLength(ossM.getContentLength())
+ .setMetaInfo(metaInfo);
+ });
+ } catch (OSSException oe) {
+ // TODO: 判断文件不存在
+ }
return Optional.empty();
}
+
+ void initOssClient(String endpoint, String mode, String ak, String sk, String token) throws Exception {
+
+ log.info("[AliOssService] init OSS by config: {},{},{},{},{}", endpoint, mode, ak, sk, token);
+
+ CredentialsProvider credentialsProvider;
+ CredentialType credentialType = CredentialType.parse(mode);
+ switch (credentialType) {
+ case PWD:
+ credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
+ break;
+ case SYSTEM_PROPERTY:
+ credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
+ break;
+ default:
+ credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
+ }
+
+ this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
+ log.info("[AliOssService] initialize OSS successfully!");
+ }
+
+
+ @AllArgsConstructor
+ enum CredentialType {
+ /**
+ * 从环境读取
+ */
+ ENV("env"),
+ /**
+ * 系统配置
+ */
+ SYSTEM_PROPERTY("sys"),
+ /**
+ * 从账号密码读取
+ */
+ PWD("pwd")
+ ;
+
+ private final String code;
+
+ /**
+ * parse credential type
+ * @param mode oms.storage.dfs.alioss.credential_type
+ * @return CredentialType
+ */
+ public static CredentialType parse(String mode) {
+
+ for (CredentialType credentialType : values()) {
+ if (StringUtils.equalsIgnoreCase(credentialType.code, mode)) {
+ return credentialType;
+ }
+ }
+
+ return PWD;
+ }
+
+ }
}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java
index 7c1e131d..68e973f8 100644
--- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java
@@ -83,6 +83,7 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
}
return Optional.of(new FileMeta()
.setLength(first.getLength())
+ .setLastModifiedTime(first.getUploadDate())
.setMetaInfo(first.getMetadata()));
}
@@ -109,7 +110,25 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
@Override
public void afterPropertiesSet() throws Exception {
- String uri = parseMongoUri(environment);
+ String uri = parseMongoUri();
+ initMongo(uri);
+ }
+
+ private GridFSBucket getBucket(String bucketName) {
+ return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
+ }
+
+ private String parseMongoUri() {
+ // 优先从新的规则读取
+ String uri = fetchProperty(TYPE_MONGO, "uri");
+ if (StringUtils.isNotEmpty(uri)) {
+ return uri;
+ }
+ // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置
+ return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
+ }
+
+ private void initMongo(String uri) {
log.info("[GridFsService] mongoDB uri: {}", uri);
if (StringUtils.isEmpty(uri)) {
log.warn("[GridFsService] uri is empty, GridFsService is off now!");
@@ -124,18 +143,4 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
log.info("[GridFsService] turn on mongodb GridFs as storage layer.");
}
-
- private GridFSBucket getBucket(String bucketName) {
- return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
- }
-
- static String parseMongoUri(Environment environment) {
- // 优先从新的规则读取
- String uri = fetchProperty(environment, TYPE_MONGO, "uri");
- if (StringUtils.isNotEmpty(uri)) {
- return uri;
- }
- // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置
- return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
- }
}