mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: [storageExt] use PropertyAndOneBeanCondition to control multi impl
This commit is contained in:
parent
b251df4c35
commit
1c70bbc670
@ -0,0 +1,95 @@
|
||||
package tech.powerjob.server.common.spring.condition;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.context.annotation.Condition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.type.AnnotatedTypeMetadata;
|
||||
import tech.powerjob.common.utils.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* PropertyAndOneBeanCondition
|
||||
* 存在多个接口实现时的唯一规则
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class PropertyAndOneBeanCondition implements Condition {
|
||||
|
||||
/**
|
||||
* 配置中存在任意一个 Key 即可加载该 Bean,空代表不校验
|
||||
* @return Keys
|
||||
*/
|
||||
protected abstract List<String> anyConfigKey();
|
||||
|
||||
/**
|
||||
* Bean 唯一性校验,空代表不校验
|
||||
* @return beanType
|
||||
*/
|
||||
protected abstract Class<?> beanType();
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
|
||||
|
||||
boolean anyCfgExist = checkAnyConfigExist(context);
|
||||
log.info("[PropertyAndOneBeanCondition] [{}] check any config exist result with keys={}: {}", thisName(), anyConfigKey(), anyCfgExist);
|
||||
if (!anyCfgExist) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Class<?> beanType = beanType();
|
||||
if (beanType == null) {
|
||||
return true;
|
||||
}
|
||||
boolean exist = checkBeanExist(context);
|
||||
log.info("[PropertyAndOneBeanCondition] [{}] bean of type[{}] exist check result: {}", thisName(), beanType.getSimpleName(), exist);
|
||||
if (exist) {
|
||||
log.info("[PropertyAndOneBeanCondition] [{}] bean of type[{}] already exist, skip load!", thisName(), beanType.getSimpleName());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean checkAnyConfigExist(ConditionContext context) {
|
||||
Environment environment = context.getEnvironment();
|
||||
|
||||
List<String> keys = anyConfigKey();
|
||||
|
||||
if (CollectionUtils.isEmpty(keys)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 判断前缀是否符合,任意满足即可
|
||||
for (String key : keys) {
|
||||
if (StringUtils.isNotEmpty(environment.getProperty(key))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean checkBeanExist(ConditionContext context) {
|
||||
|
||||
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
|
||||
if (beanFactory == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
beanFactory.getBean(beanType());
|
||||
return true;
|
||||
} catch (NoSuchBeanDefinitionException ignore) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private String thisName() {
|
||||
return this.getClass().getSimpleName();
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
/**
|
||||
* Spring 通用能力包
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
package tech.powerjob.server.common.spring;
|
@ -1,27 +1,38 @@
|
||||
package tech.powerjob.server.persistence.storage;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.core.env.Environment;
|
||||
import tech.powerjob.server.extension.dfs.DFsService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* AbstractDFsService
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2023/7/28
|
||||
*/
|
||||
public abstract class AbstractDFsService implements DFsService, InitializingBean {
|
||||
@Slf4j
|
||||
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
|
||||
|
||||
@Resource
|
||||
protected Environment environment;
|
||||
public AbstractDFsService() {
|
||||
log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
|
||||
}
|
||||
|
||||
abstract protected void init(ApplicationContext applicationContext);
|
||||
|
||||
protected static final String PROPERTY_KEY = "oms.storage.dfs";
|
||||
|
||||
protected String fetchProperty(String dfsType, String key) {
|
||||
protected static String fetchProperty(Environment environment, String dfsType, String key) {
|
||||
String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
|
||||
return environment.getProperty(pKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
|
||||
init(applicationContext);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
package tech.powerjob.server.persistence.storage;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import tech.powerjob.server.extension.dfs.DFsService;
|
||||
import tech.powerjob.server.persistence.storage.impl.AliOssService;
|
||||
import tech.powerjob.server.persistence.storage.impl.EmptyDFsService;
|
||||
import tech.powerjob.server.persistence.storage.impl.GridFsService;
|
||||
|
||||
/**
|
||||
* Description
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
@Configuration
|
||||
public class StorageConfiguration {
|
||||
|
||||
@Bean
|
||||
@Conditional(GridFsService.GridFsCondition.class)
|
||||
public DFsService initGridFs() {
|
||||
return new GridFsService();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Conditional(AliOssService.AliOssCondition.class)
|
||||
public DFsService initAliOssFs() {
|
||||
return new AliOssService();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Conditional(EmptyDFsService.EmptyCondition.class)
|
||||
public DFsService initEmptyDfs() {
|
||||
return new EmptyDFsService();
|
||||
}
|
||||
}
|
@ -1,12 +1,15 @@
|
||||
package tech.powerjob.server.persistence.storage.impl;
|
||||
|
||||
import com.aliyun.oss.*;
|
||||
import com.aliyun.oss.OSS;
|
||||
import com.aliyun.oss.OSSClientBuilder;
|
||||
import com.aliyun.oss.OSSException;
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
|
||||
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
|
||||
import com.aliyun.oss.model.DownloadFileRequest;
|
||||
import com.aliyun.oss.model.ObjectMetadata;
|
||||
import com.aliyun.oss.model.PutObjectRequest;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
@ -14,13 +17,16 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.core.env.Environment;
|
||||
import tech.powerjob.server.extension.dfs.*;
|
||||
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
||||
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
|
||||
|
||||
import javax.annotation.Priority;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@ -39,9 +45,8 @@ import java.util.Optional;
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@ConditionalOnProperty(name = {"oms.storage.dfs.alioss.endpoint"}, matchIfMissing = false)
|
||||
@ConditionalOnMissingBean(DFsService.class)
|
||||
@Priority(value = Integer.MAX_VALUE - 1)
|
||||
@Conditional(AliOssService.AliOssCondition.class)
|
||||
public class AliOssService extends AbstractDFsService {
|
||||
|
||||
private static final String TYPE_ALI_OSS = "alioss";
|
||||
@ -60,19 +65,6 @@ public class AliOssService extends AbstractDFsService {
|
||||
|
||||
private static final String NO_SUCH_KEY = "NoSuchKey";
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
||||
String endpoint = fetchProperty(TYPE_ALI_OSS, KEY_ENDPOINT);
|
||||
String bkt = fetchProperty(TYPE_ALI_OSS, KEY_BUCKET);
|
||||
String ct = fetchProperty(TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
|
||||
String ak = fetchProperty(TYPE_ALI_OSS, KEY_AK);
|
||||
String sk = fetchProperty(TYPE_ALI_OSS, KEY_SK);
|
||||
String token = fetchProperty(TYPE_ALI_OSS, KEY_TOKEN);
|
||||
|
||||
initOssClient(endpoint, bkt, ct, ak, sk, token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(StoreRequest storeRequest) throws IOException {
|
||||
|
||||
@ -162,6 +154,29 @@ public class AliOssService extends AbstractDFsService {
|
||||
*/
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
oss.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(ApplicationContext applicationContext) {
|
||||
Environment environment = applicationContext.getEnvironment();
|
||||
|
||||
String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
|
||||
String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
|
||||
String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
|
||||
String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
|
||||
String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
|
||||
String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
|
||||
|
||||
try {
|
||||
initOssClient(endpoint, bkt, ct, ak, sk, token);
|
||||
} catch (Exception e) {
|
||||
ExceptionUtils.rethrow(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
enum CredentialType {
|
||||
@ -198,4 +213,17 @@ public class AliOssService extends AbstractDFsService {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class AliOssCondition extends PropertyAndOneBeanCondition {
|
||||
|
||||
@Override
|
||||
protected List<String> anyConfigKey() {
|
||||
return Lists.newArrayList("oms.storage.dfs.alioss.endpoint");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> beanType() {
|
||||
return DFsService.class;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,14 @@
|
||||
package tech.powerjob.server.persistence.storage.impl;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import tech.powerjob.server.extension.dfs.*;
|
||||
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
||||
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
|
||||
|
||||
import javax.annotation.Priority;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
@ -15,13 +17,10 @@ import java.util.Optional;
|
||||
* @author tjq
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
@Service
|
||||
@Order(value = Ordered.LOWEST_PRECEDENCE)
|
||||
@ConditionalOnMissingBean(DFsService.class)
|
||||
public class EmptyDFsService implements DFsService {
|
||||
@Priority(value = Integer.MAX_VALUE)
|
||||
@Conditional(EmptyDFsService.EmptyCondition.class)
|
||||
public class EmptyDFsService extends AbstractDFsService {
|
||||
|
||||
public EmptyDFsService() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(StoreRequest storeRequest) throws IOException {
|
||||
@ -35,4 +34,26 @@ public class EmptyDFsService implements DFsService {
|
||||
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(ApplicationContext applicationContext) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static class EmptyCondition extends PropertyAndOneBeanCondition {
|
||||
@Override
|
||||
protected List<String> anyConfigKey() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> beanType() {
|
||||
return DFsService.class;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.persistence.storage.impl;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.client.MongoClient;
|
||||
@ -17,17 +18,20 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.bson.conversions.Bson;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.server.extension.dfs.*;
|
||||
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
||||
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
|
||||
|
||||
import java.io.*;
|
||||
import javax.annotation.Priority;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@ -39,15 +43,17 @@ import java.util.Optional;
|
||||
* @since 2023/7/28
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@ConditionalOnProperty(name = {"oms.storage.dfs.mongodb.uri", "spring.data.mongodb.uri"}, matchIfMissing = false)
|
||||
@ConditionalOnMissingBean(DFsService.class)
|
||||
public class GridFsService extends AbstractDFsService implements InitializingBean {
|
||||
@Priority(value = Integer.MAX_VALUE - 10)
|
||||
@Conditional(GridFsService.GridFsCondition.class)
|
||||
public class GridFsService extends AbstractDFsService {
|
||||
|
||||
private MongoClient mongoClient;
|
||||
private MongoDatabase db;
|
||||
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
|
||||
private static final String TYPE_MONGO = "mongodb";
|
||||
|
||||
private static final String KEY_URI = "uri";
|
||||
|
||||
private static final String SPRING_MONGO_DB_CONFIG_KEY = "spring.data.mongodb.uri";
|
||||
|
||||
@Override
|
||||
@ -108,19 +114,13 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
|
||||
log.info("[GridFsService] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
String uri = parseMongoUri();
|
||||
initMongo(uri);
|
||||
}
|
||||
|
||||
private GridFSBucket getBucket(String bucketName) {
|
||||
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
|
||||
}
|
||||
|
||||
private String parseMongoUri() {
|
||||
private String parseMongoUri(Environment environment) {
|
||||
// 优先从新的规则读取
|
||||
String uri = fetchProperty(TYPE_MONGO, "uri");
|
||||
String uri = fetchProperty(environment, TYPE_MONGO, KEY_URI);
|
||||
if (StringUtils.isNotEmpty(uri)) {
|
||||
return uri;
|
||||
}
|
||||
@ -136,9 +136,32 @@ public class GridFsService extends AbstractDFsService implements InitializingBea
|
||||
}
|
||||
|
||||
ConnectionString connectionString = new ConnectionString(uri);
|
||||
MongoClient mongoClient = MongoClients.create(connectionString);
|
||||
mongoClient = MongoClients.create(connectionString);
|
||||
db = mongoClient.getDatabase(Optional.ofNullable(connectionString.getDatabase()).orElse("pj"));
|
||||
|
||||
log.info("[GridFsService] turn on mongodb GridFs as storage layer.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
mongoClient.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(ApplicationContext applicationContext) {
|
||||
String uri = parseMongoUri(applicationContext.getEnvironment());
|
||||
initMongo(uri);
|
||||
}
|
||||
|
||||
public static class GridFsCondition extends PropertyAndOneBeanCondition {
|
||||
@Override
|
||||
protected List<String> anyConfigKey() {
|
||||
return Lists.newArrayList("spring.data.mongodb.uri", "oms.storage.dfs.mongo.uri");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> beanType() {
|
||||
return DFsService.class;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,13 @@
|
||||
package tech.powerjob.server.persistence.storage.impl;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* test GridFS
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2023/7/30
|
||||
*/
|
||||
class GridFsServiceTest {
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user