mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
fix: MinioOssService is not working properly #844
This commit is contained in:
parent
9c0731f20d
commit
fb1159e1b5
@ -55,7 +55,7 @@
|
|||||||
<powerjob-remote-impl-akka.version>4.3.8</powerjob-remote-impl-akka.version>
|
<powerjob-remote-impl-akka.version>4.3.8</powerjob-remote-impl-akka.version>
|
||||||
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
|
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
|
||||||
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
|
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
|
||||||
<minio.version>8.5.2</minio.version>
|
<aws-java-sdk-s3.version>1.12.665</aws-java-sdk-s3.version>
|
||||||
<commons-collections4.version>4.4</commons-collections4.version>
|
<commons-collections4.version>4.4</commons-collections4.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
@ -114,12 +114,13 @@
|
|||||||
<artifactId>aliyun-sdk-oss</artifactId>
|
<artifactId>aliyun-sdk-oss</artifactId>
|
||||||
<version>${aliyun-sdk-oss.version}</version>
|
<version>${aliyun-sdk-oss.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- 存储扩展-Minio,未使用可移除 -->
|
<!-- 存储扩展-Minio/S3,未使用可移除(minio-client 依赖 OKHTTP4.x 版本,强制引入 kotlin 标准库,为了防止引入更多问题放弃) -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.minio</groupId>
|
<groupId>com.amazonaws</groupId>
|
||||||
<artifactId>minio</artifactId>
|
<artifactId>aws-java-sdk-s3</artifactId>
|
||||||
<version>${minio.version}</version>
|
<version>${aws-java-sdk-s3.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
|
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
|
@ -41,8 +41,8 @@
|
|||||||
<artifactId>aliyun-sdk-oss</artifactId>
|
<artifactId>aliyun-sdk-oss</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.minio</groupId>
|
<groupId>com.amazonaws</groupId>
|
||||||
<artifactId>minio</artifactId>
|
<artifactId>aws-java-sdk-s3</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
@ -1,8 +1,13 @@
|
|||||||
package tech.powerjob.server.persistence.storage.impl;
|
package tech.powerjob.server.persistence.storage.impl;
|
||||||
|
|
||||||
|
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||||
|
import com.amazonaws.auth.BasicAWSCredentials;
|
||||||
|
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||||
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
|
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
|
||||||
|
import com.amazonaws.services.s3.model.*;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import io.minio.*;
|
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
@ -16,8 +21,6 @@ import tech.powerjob.server.extension.dfs.*;
|
|||||||
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
||||||
|
|
||||||
import javax.annotation.Priority;
|
import javax.annotation.Priority;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -44,19 +47,19 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
private static final String KEY_BUCKET_NAME = "bucketName";
|
private static final String KEY_BUCKET_NAME = "bucketName";
|
||||||
private static final String ACCESS_KEY = "accessKey";
|
private static final String ACCESS_KEY = "accessKey";
|
||||||
private static final String SECRET_KEY = "secretKey";
|
private static final String SECRET_KEY = "secretKey";
|
||||||
private MinioClient minioClient;
|
private AmazonS3 amazonS3;
|
||||||
private String bucket;
|
private String bucket;
|
||||||
private static final String NO_SUCH_KEY = "NoSuchKey";
|
private static final String NOT_FOUNT = "404 Not Found";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void store(StoreRequest storeRequest) {
|
public void store(StoreRequest storeRequest) {
|
||||||
try {
|
try {
|
||||||
minioClient.uploadObject(UploadObjectArgs.builder()
|
|
||||||
.bucket(this.bucket)
|
String fileName = parseFileName(storeRequest.getFileLocation());
|
||||||
.object(parseFileName(storeRequest.getFileLocation()))
|
// 创建 PutObjectRequest 对象
|
||||||
.filename(storeRequest.getLocalFile().getPath())
|
PutObjectRequest request = new PutObjectRequest(this.bucket, fileName, storeRequest.getLocalFile());
|
||||||
.contentType(Files.probeContentType(storeRequest.getLocalFile().toPath()))
|
|
||||||
.build());
|
amazonS3.putObject(request);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
ExceptionUtils.rethrow(t);
|
ExceptionUtils.rethrow(t);
|
||||||
}
|
}
|
||||||
@ -66,13 +69,11 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
public void download(DownloadRequest downloadRequest) {
|
public void download(DownloadRequest downloadRequest) {
|
||||||
try {
|
try {
|
||||||
FileUtils.forceMkdirParent(downloadRequest.getTarget());
|
FileUtils.forceMkdirParent(downloadRequest.getTarget());
|
||||||
// 下载文件
|
|
||||||
minioClient.downloadObject(
|
String fileName = parseFileName(downloadRequest.getFileLocation());
|
||||||
DownloadObjectArgs.builder()
|
GetObjectRequest getObjectRequest = new GetObjectRequest(this.bucket, fileName);
|
||||||
.bucket(this.bucket)
|
amazonS3.getObject(getObjectRequest, downloadRequest.getTarget());
|
||||||
.object(parseFileName(downloadRequest.getFileLocation()))
|
|
||||||
.filename(downloadRequest.getTarget().getAbsolutePath())
|
|
||||||
.build());
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
ExceptionUtils.rethrow(t);
|
ExceptionUtils.rethrow(t);
|
||||||
}
|
}
|
||||||
@ -86,26 +87,28 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
@Override
|
@Override
|
||||||
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) {
|
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) {
|
||||||
try {
|
try {
|
||||||
StatObjectResponse stat = minioClient.statObject(StatObjectArgs.builder()
|
|
||||||
.bucket(this.bucket)
|
String fileName = parseFileName(fileLocation);
|
||||||
.object(parseFileName(fileLocation))
|
ObjectMetadata objectMetadata = amazonS3.getObjectMetadata(this.bucket, fileName);
|
||||||
.build());
|
|
||||||
return Optional.ofNullable(stat).map(minioStat -> {
|
return Optional.ofNullable(objectMetadata).map(minioStat -> {
|
||||||
|
|
||||||
Map<String, Object> metaInfo = Maps.newHashMap();
|
Map<String, Object> metaInfo = Maps.newHashMap();
|
||||||
if (stat.userMetadata() != null) {
|
|
||||||
metaInfo.putAll(stat.userMetadata());
|
if (objectMetadata.getRawMetadata() != null) {
|
||||||
|
metaInfo.putAll(objectMetadata.getRawMetadata());
|
||||||
}
|
}
|
||||||
return new FileMeta()
|
return new FileMeta()
|
||||||
.setLastModifiedTime(Date.from(stat.lastModified().toInstant()))
|
.setLastModifiedTime(objectMetadata.getLastModified())
|
||||||
.setLength(stat.size())
|
.setLength(objectMetadata.getContentLength())
|
||||||
.setMetaInfo(metaInfo);
|
.setMetaInfo(metaInfo);
|
||||||
});
|
});
|
||||||
} catch (Exception oe) {
|
} catch (AmazonS3Exception s3Exception) {
|
||||||
String errorCode = oe.getMessage();
|
String errorCode = s3Exception.getErrorCode();
|
||||||
if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
|
if (NOT_FOUNT.equalsIgnoreCase(errorCode)) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
} catch (Exception oe) {
|
||||||
ExceptionUtils.rethrow(oe);
|
ExceptionUtils.rethrow(oe);
|
||||||
}
|
}
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
@ -170,8 +173,18 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
if (StringUtils.isEmpty(bucketName)) {
|
if (StringUtils.isEmpty(bucketName)) {
|
||||||
throw new IllegalArgumentException("'oms.storage.dfs.minio.bucketName' can't be empty, please creat a bucket in minio oss console then config it to powerjob");
|
throw new IllegalArgumentException("'oms.storage.dfs.minio.bucketName' can't be empty, please creat a bucket in minio oss console then config it to powerjob");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 创建凭证对象
|
||||||
|
BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
|
||||||
|
|
||||||
|
// 创建AmazonS3客户端并指定终端节点和凭证
|
||||||
|
this.amazonS3 = AmazonS3ClientBuilder.standard()
|
||||||
|
// 当使用 AWS Java SDK 连接到非AWS服务(如MinIO)时,指定区域(Region)是必需的,即使这个区域对于你的MinIO实例并不真正适用。原因在于AWS SDK的客户端构建器需要一个区域来配置其服务端点,即使在连接到本地或第三方S3兼容服务时也是如此。使用 "us-east-1" 作为占位符是很常见的做法,因为它是AWS最常用的区域之一。这不会影响到实际的连接或数据传输,因为真正的服务地址是由你提供的终端节点URL决定的。如果你的代码主要是与MinIO交互,并且不涉及AWS服务,那么这个区域设置只是形式上的要求。
|
||||||
|
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, "us-east-1"))
|
||||||
|
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
|
||||||
|
.withPathStyleAccessEnabled(true) // 重要:启用路径样式访问
|
||||||
|
.build();
|
||||||
this.bucket = bucketName;
|
this.bucket = bucketName;
|
||||||
minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).build();
|
|
||||||
createBucket(bucketName);
|
createBucket(bucketName);
|
||||||
log.info("[Minio] initialize OSS successfully!");
|
log.info("[Minio] initialize OSS successfully!");
|
||||||
}
|
}
|
||||||
@ -183,10 +196,13 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
*/
|
*/
|
||||||
@SneakyThrows(Exception.class)
|
@SneakyThrows(Exception.class)
|
||||||
public void createBucket(String bucketName) {
|
public void createBucket(String bucketName) {
|
||||||
if (!bucketExists(bucketName)) {
|
if (bucketExists(bucketName)) {
|
||||||
minioClient.makeBucket(MakeBucketArgs.builder()
|
return;
|
||||||
.bucket(bucketName).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Bucket createBucketResult = amazonS3.createBucket(bucketName);
|
||||||
|
log.info("[Minio] createBucket successfully, bucketName: {}, createResult: {}", bucketName, createBucketResult);
|
||||||
|
|
||||||
String policy = "{\n" +
|
String policy = "{\n" +
|
||||||
" \"Version\": \"2012-10-17\",\n" +
|
" \"Version\": \"2012-10-17\",\n" +
|
||||||
" \"Statement\": [\n" +
|
" \"Statement\": [\n" +
|
||||||
@ -206,7 +222,11 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
" }\n" +
|
" }\n" +
|
||||||
" ]\n" +
|
" ]\n" +
|
||||||
"}";
|
"}";
|
||||||
minioClient.setBucketPolicy(SetBucketPolicyArgs.builder().bucket(bucketName).config(policy).build());
|
try {
|
||||||
|
amazonS3.setBucketPolicy(bucketName, policy);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[Minio] setBucketPolicy failed, maybe you need to setBucketPolicy by yourself!", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -217,7 +237,8 @@ public class MinioOssService extends AbstractDFsService {
|
|||||||
*/
|
*/
|
||||||
@SneakyThrows(Exception.class)
|
@SneakyThrows(Exception.class)
|
||||||
public boolean bucketExists(String bucketName) {
|
public boolean bucketExists(String bucketName) {
|
||||||
return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
|
|
||||||
|
return amazonS3.doesBucketExistV2(bucketName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MinioOssCondition extends PropertyAndOneBeanCondition {
|
public static class MinioOssCondition extends PropertyAndOneBeanCondition {
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package tech.powerjob.server.persistence.storage.impl;
|
package tech.powerjob.server.persistence.storage.impl;
|
||||||
|
|
||||||
import com.aliyun.oss.common.utils.AuthUtils;
|
import com.aliyun.oss.common.utils.AuthUtils;
|
||||||
import com.aliyun.oss.common.utils.StringUtils;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import tech.powerjob.server.extension.dfs.DFsService;
|
import tech.powerjob.server.extension.dfs.DFsService;
|
||||||
|
|
||||||
@ -33,7 +33,7 @@ class AliOssServiceTest extends AbstractDfsServiceTest {
|
|||||||
|
|
||||||
log.info("[AliOssServiceTest] ak: {}, sk: {}", accessKeyId, secretAccessKey);
|
log.info("[AliOssServiceTest] ak: {}, sk: {}", accessKeyId, secretAccessKey);
|
||||||
|
|
||||||
if (org.apache.commons.lang3.StringUtils.isAnyEmpty(accessKeyId, secretAccessKey)) {
|
if (StringUtils.isAnyEmpty(accessKeyId, secretAccessKey)) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,29 @@
|
|||||||
|
package tech.powerjob.server.persistence.storage.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
|
import tech.powerjob.server.extension.dfs.DFsService;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* desc
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2024/2/26
|
||||||
|
*/
|
||||||
|
class MinioOssServiceTest extends AbstractDfsServiceTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Optional<DFsService> fetchService() {
|
||||||
|
try {
|
||||||
|
MinioOssService aliOssService = new MinioOssService();
|
||||||
|
aliOssService.initOssClient("http://192.168.124.23:9000", "pj2","testAk", "testSktestSktestSk");
|
||||||
|
return Optional.of(aliOssService);
|
||||||
|
} catch (Exception e) {
|
||||||
|
ExceptionUtils.rethrow(e);
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user