mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: [storageExt] MySqlSeriesDfsService
This commit is contained in:
parent
c50a3edebf
commit
37a62549db
@ -8,9 +8,12 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
|
import tech.powerjob.common.exception.ImpossibleException;
|
||||||
import tech.powerjob.common.exception.PowerJobException;
|
import tech.powerjob.common.exception.PowerJobException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JSON工具类
|
* JSON工具类
|
||||||
@ -27,6 +30,8 @@ public class JsonUtils {
|
|||||||
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
|
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>> () {};
|
||||||
|
|
||||||
private JsonUtils(){
|
private JsonUtils(){
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -67,6 +72,18 @@ public class JsonUtils {
|
|||||||
return JSON_MAPPER.readValue(json, clz);
|
return JSON_MAPPER.readValue(json, clz);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<String, Object> parseMap(String json) {
|
||||||
|
if (StringUtils.isEmpty(json)) {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return JSON_MAPPER.readValue(json, MAP_TYPE_REFERENCE);
|
||||||
|
} catch (Exception e) {
|
||||||
|
ExceptionUtils.rethrow(e);
|
||||||
|
}
|
||||||
|
throw new ImpossibleException();
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> T parseObject(byte[] b, Class<T> clz) throws IOException {
|
public static <T> T parseObject(byte[] b, Class<T> clz) throws IOException {
|
||||||
return JSON_MAPPER.readValue(b, clz);
|
return JSON_MAPPER.readValue(b, clz);
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,21 @@ public class NetUtils {
|
|||||||
return ThreadLocalRandom.current().nextInt(RND_PORT_START, RND_PORT_END);
|
return ThreadLocalRandom.current().nextInt(RND_PORT_START, RND_PORT_END);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检测某个 IP 端口是否可用
|
||||||
|
* @param ip IP
|
||||||
|
* @param port 端口
|
||||||
|
* @return 是否可用
|
||||||
|
*/
|
||||||
|
public static boolean checkIpPortAvailable(String ip, int port) {
|
||||||
|
try (Socket socket = new Socket()) {
|
||||||
|
socket.connect(new InetSocketAddress(ip, port), 1000);
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取本机 IP 地址
|
* 获取本机 IP 地址
|
||||||
*
|
*
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package tech.powerjob.server.extension.dfs;
|
package tech.powerjob.server.extension.dfs;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -9,7 +10,8 @@ import lombok.experimental.Accessors;
|
|||||||
* @author tjq
|
* @author tjq
|
||||||
* @since 2023/7/16
|
* @since 2023/7/16
|
||||||
*/
|
*/
|
||||||
@Data
|
@Getter
|
||||||
|
@Setter
|
||||||
@Accessors(chain = true)
|
@Accessors(chain = true)
|
||||||
public class FileLocation {
|
public class FileLocation {
|
||||||
|
|
||||||
@ -22,4 +24,9 @@ public class FileLocation {
|
|||||||
* 名称
|
* 名称
|
||||||
*/
|
*/
|
||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s.%s", bucket, name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@ import tech.powerjob.server.extension.dfs.DFsService;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
|
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
|
||||||
|
|
||||||
|
protected ApplicationContext applicationContext;
|
||||||
|
|
||||||
public AbstractDFsService() {
|
public AbstractDFsService() {
|
||||||
log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
|
log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
|
||||||
}
|
}
|
||||||
@ -32,6 +34,7 @@ public abstract class AbstractDFsService implements DFsService, ApplicationConte
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
|
log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
|
||||||
init(applicationContext);
|
init(applicationContext);
|
||||||
}
|
}
|
||||||
|
@ -1,68 +1,228 @@
|
|||||||
package tech.powerjob.server.persistence.storage.impl;
|
package tech.powerjob.server.persistence.storage.impl;
|
||||||
|
|
||||||
|
import com.google.common.base.Stopwatch;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import com.zaxxer.hikari.HikariConfig;
|
import com.zaxxer.hikari.HikariConfig;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Conditional;
|
||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
import tech.powerjob.server.extension.dfs.DownloadRequest;
|
import tech.powerjob.common.serialize.JsonUtils;
|
||||||
import tech.powerjob.server.extension.dfs.FileLocation;
|
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||||
import tech.powerjob.server.extension.dfs.FileMeta;
|
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
|
||||||
import tech.powerjob.server.extension.dfs.StoreRequest;
|
import tech.powerjob.server.extension.dfs.*;
|
||||||
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
import tech.powerjob.server.persistence.storage.AbstractDFsService;
|
||||||
|
|
||||||
|
import javax.annotation.Priority;
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MySQL 特性类似的数据库存储
|
* MySQL 特性类似的数据库存储
|
||||||
|
* PS1. 大文件上传可能会报 max_allowed_packet 不足,可根据参数放开数据库限制 set global max_allowed_packet = 500*1024*1024
|
||||||
|
* PS1. 官方基于 MySQL 测试,其他数据库使用前请自测,敬请谅解!
|
||||||
|
* PS2. 数据库并不适合大规模的文件存储,该扩展仅适用于简单业务,大型业务场景请选择其他存储方案(OSS、MongoDB等)
|
||||||
|
* ********************* 配置项 *********************
|
||||||
|
* oms.storage.dfs.mysql_series.driver
|
||||||
|
* oms.storage.dfs.mysql_series.url
|
||||||
|
* oms.storage.dfs.mysql_series.username
|
||||||
|
* oms.storage.dfs.mysql_series.password
|
||||||
|
* oms.storage.dfs.mysql_series.auto_create_table
|
||||||
|
* oms.storage.dfs.mysql_series.table_name
|
||||||
*
|
*
|
||||||
* @author tjq
|
* @author tjq
|
||||||
* @since 2023/8/9
|
* @since 2023/8/9
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@Priority(value = Integer.MAX_VALUE - 2)
|
||||||
|
@Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class)
|
||||||
public class MySqlSeriesDfsService extends AbstractDFsService {
|
public class MySqlSeriesDfsService extends AbstractDFsService {
|
||||||
|
|
||||||
private DataSource dataSource;
|
private DataSource dataSource;
|
||||||
|
|
||||||
private static final String TYPE_MYSQL = "mysql_series";
|
private static final String TYPE_MYSQL = "mysql_series";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 数据库驱动,MYSQL8 为 com.mysql.cj.jdbc.Driver
|
||||||
|
*/
|
||||||
private static final String KEY_DRIVER_NAME = "driver";
|
private static final String KEY_DRIVER_NAME = "driver";
|
||||||
|
/**
|
||||||
|
* 数据库地址,比如 jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
|
||||||
|
*/
|
||||||
private static final String KEY_URL = "url";
|
private static final String KEY_URL = "url";
|
||||||
|
/**
|
||||||
|
* 数据库账号,比如 root
|
||||||
|
*/
|
||||||
private static final String KEY_USERNAME = "username";
|
private static final String KEY_USERNAME = "username";
|
||||||
|
/**
|
||||||
|
* 数据库密码
|
||||||
|
*/
|
||||||
private static final String KEY_PASSWORD = "password";
|
private static final String KEY_PASSWORD = "password";
|
||||||
|
/**
|
||||||
|
* 是否自动建表
|
||||||
|
*/
|
||||||
|
private static final String KEY_AUTO_CREATE_TABLE = "auto_create_table";
|
||||||
|
/**
|
||||||
|
* 表名
|
||||||
|
*/
|
||||||
|
private static final String KEY_TABLE_NAME = "table_name";
|
||||||
|
|
||||||
|
/* ********************* SQL region ********************* */
|
||||||
|
|
||||||
|
private static final String DEFAULT_TABLE_NAME = "powerjob_files";
|
||||||
|
|
||||||
private static final String CREATE_TABLE_SQL = "CREATE TABLE\n" +
|
private static final String CREATE_TABLE_SQL = "CREATE TABLE\n" +
|
||||||
"IF\n" +
|
"IF\n" +
|
||||||
"\tNOT EXISTS powerjob_files (\n" +
|
"\tNOT EXISTS %s (\n" +
|
||||||
"\t\t`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT 'ID',\n" +
|
"\t\t`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT 'ID',\n" +
|
||||||
|
"\t\t`bucket` VARCHAR ( 255 ) NOT NULL COMMENT '分桶',\n" +
|
||||||
|
"\t\t`name` VARCHAR ( 255 ) NOT NULL COMMENT '文件名称',\n" +
|
||||||
|
"\t\t`version` VARCHAR ( 255 ) NOT NULL COMMENT '版本',\n" +
|
||||||
|
"\t\t`meta` VARCHAR ( 255 ) COMMENT '元数据',\n" +
|
||||||
|
"\t\t`length` BIGINT NOT NULL COMMENT '长度',\n" +
|
||||||
|
"\t\t`status` INT NOT NULL COMMENT '状态',\n" +
|
||||||
|
"\t\t`data` LONGBLOB NOT NULL COMMENT '文件内容',\n" +
|
||||||
|
"\t\t`extra` VARCHAR ( 255 ) COMMENT '其他信息',\n" +
|
||||||
"\t\t`gmt_create` DATETIME NOT NULL COMMENT '创建时间',\n" +
|
"\t\t`gmt_create` DATETIME NOT NULL COMMENT '创建时间',\n" +
|
||||||
"\t\t`gmt_modified` DATETIME COMMENT '更新时间',\n" +
|
"\t\t`gmt_modified` DATETIME COMMENT '更新时间',\n" +
|
||||||
"\t\t`name` VARCHAR ( 255 ) NOT NULL COMMENT '文件名称',\n" +
|
|
||||||
"\t\t`bucket` VARCHAR ( 255 ) NOT NULL COMMENT '分桶',\n" +
|
|
||||||
"\t\t`extra` VARCHAR ( 255 ) NOT NULL COMMENT '其他信息',\n" +
|
|
||||||
"\t\t`version` VARCHAR ( 255 ) NOT NULL COMMENT '版本',\n" +
|
|
||||||
"\t\t`data` LONGBLOB NOT NULL COMMENT '文件内容',\n" +
|
|
||||||
"\tPRIMARY KEY ( id ) \n" +
|
"\tPRIMARY KEY ( id ) \n" +
|
||||||
"\t);";
|
"\t);";
|
||||||
|
|
||||||
|
private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);";
|
||||||
|
|
||||||
|
private static final String DELETE_SQL = "DELETE FROM %s";
|
||||||
|
|
||||||
|
private static final String QUERY_FULL_SQL = "select * from %s";
|
||||||
|
|
||||||
|
private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s";
|
||||||
|
|
||||||
|
private void deleteByLocation(FileLocation fileLocation) {
|
||||||
|
String dSQLPrefix = fullSQL(DELETE_SQL);
|
||||||
|
|
||||||
|
try (Connection con = dataSource.getConnection()) {
|
||||||
|
|
||||||
|
String dSQL = dSQLPrefix.concat(whereSQL(fileLocation));
|
||||||
|
|
||||||
|
con.createStatement().executeUpdate(dSQL);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[MySqlSeriesDfsService] deleteByLocation [{}] failed!", fileLocation);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void store(StoreRequest storeRequest) throws IOException {
|
public void store(StoreRequest storeRequest) throws IOException {
|
||||||
|
|
||||||
|
Stopwatch sw = Stopwatch.createStarted();
|
||||||
|
String insertSQL = fullSQL(INSERT_SQL);
|
||||||
|
|
||||||
|
FileLocation fileLocation = storeRequest.getFileLocation();
|
||||||
|
|
||||||
|
// 覆盖写,写之前先删除
|
||||||
|
deleteByLocation(fileLocation);
|
||||||
|
|
||||||
|
Map<String, Object> meta = Maps.newHashMap();
|
||||||
|
meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath());
|
||||||
|
|
||||||
|
Date date = new Date(System.currentTimeMillis());
|
||||||
|
|
||||||
|
try (Connection con = dataSource.getConnection()) {
|
||||||
|
PreparedStatement pst = con.prepareStatement(insertSQL);
|
||||||
|
|
||||||
|
pst.setString(1, fileLocation.getBucket());
|
||||||
|
pst.setString(2, fileLocation.getName());
|
||||||
|
pst.setString(3, "mu");
|
||||||
|
pst.setString(4, JsonUtils.toJSONString(meta));
|
||||||
|
pst.setLong(5, storeRequest.getLocalFile().length());
|
||||||
|
pst.setInt(6, SwitchableStatus.ENABLE.getV());
|
||||||
|
pst.setBlob(7, new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())));
|
||||||
|
pst.setString(8, null);
|
||||||
|
pst.setDate(9, date);
|
||||||
|
pst.setDate(10, date);
|
||||||
|
|
||||||
|
pst.execute();
|
||||||
|
|
||||||
|
log.info("[MySqlSeriesDfsService] store [{}] successfully, cost: {}", fileLocation, sw);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[MySqlSeriesDfsService] store [{}] failed!", fileLocation);
|
||||||
|
ExceptionUtils.rethrow(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void download(DownloadRequest downloadRequest) throws IOException {
|
public void download(DownloadRequest downloadRequest) throws IOException {
|
||||||
|
|
||||||
|
Stopwatch sw = Stopwatch.createStarted();
|
||||||
|
String querySQL = fullSQL(QUERY_FULL_SQL);
|
||||||
|
|
||||||
|
FileLocation fileLocation = downloadRequest.getFileLocation();
|
||||||
|
|
||||||
|
FileUtils.forceMkdirParent(downloadRequest.getTarget());
|
||||||
|
|
||||||
|
try (Connection con = dataSource.getConnection()) {
|
||||||
|
|
||||||
|
ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation)));
|
||||||
|
|
||||||
|
boolean exist = resultSet.next();
|
||||||
|
|
||||||
|
if (!exist) {
|
||||||
|
log.warn("[MySqlSeriesDfsService] download file[{}] failed due to not exits!", fileLocation);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Blob dataBlob = resultSet.getBlob("data");
|
||||||
|
FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget());
|
||||||
|
|
||||||
|
log.info("[MySqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[MySqlSeriesDfsService] download file [{}] failed!", fileLocation, e);
|
||||||
|
ExceptionUtils.rethrow(e);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
|
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
|
||||||
|
|
||||||
|
String querySQL = fullSQL(QUERY_META_SQL);
|
||||||
|
|
||||||
|
try (Connection con = dataSource.getConnection()) {
|
||||||
|
|
||||||
|
ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation)));
|
||||||
|
|
||||||
|
boolean exist = resultSet.next();
|
||||||
|
|
||||||
|
if (!exist) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
FileMeta fileMeta = new FileMeta()
|
||||||
|
.setLength(resultSet.getLong("length"))
|
||||||
|
.setLastModifiedTime(resultSet.getDate("gmt_modified"))
|
||||||
|
.setMetaInfo(JsonUtils.parseMap(resultSet.getString("meta")));
|
||||||
|
return Optional.of(fileMeta);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[MySqlSeriesDfsService] fetchFileMeta [{}] failed!", fileLocation);
|
||||||
|
ExceptionUtils.rethrow(e);
|
||||||
|
}
|
||||||
|
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +235,9 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
|
|||||||
.setDriver(fetchProperty(env, TYPE_MYSQL, KEY_DRIVER_NAME))
|
.setDriver(fetchProperty(env, TYPE_MYSQL, KEY_DRIVER_NAME))
|
||||||
.setUrl(fetchProperty(env, TYPE_MYSQL, KEY_URL))
|
.setUrl(fetchProperty(env, TYPE_MYSQL, KEY_URL))
|
||||||
.setUsername(fetchProperty(env, TYPE_MYSQL, KEY_USERNAME))
|
.setUsername(fetchProperty(env, TYPE_MYSQL, KEY_USERNAME))
|
||||||
.setPassword(fetchProperty(env, TYPE_MYSQL, KEY_PASSWORD));
|
.setPassword(fetchProperty(env, TYPE_MYSQL, KEY_PASSWORD))
|
||||||
|
.setAutoCreateTable(Boolean.TRUE.toString().equalsIgnoreCase(fetchProperty(env, TYPE_MYSQL, KEY_AUTO_CREATE_TABLE)))
|
||||||
|
;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
initDatabase(mySQLProperty);
|
initDatabase(mySQLProperty);
|
||||||
@ -86,7 +248,7 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initDatabase(MySQLProperty property) {
|
void initDatabase(MySQLProperty property) {
|
||||||
|
|
||||||
log.info("[MySqlSeriesDfsService] init datasource by config: {}", property);
|
log.info("[MySqlSeriesDfsService] init datasource by config: {}", property);
|
||||||
|
|
||||||
@ -106,8 +268,35 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
|
|||||||
dataSource = new HikariDataSource(config);
|
dataSource = new HikariDataSource(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initTable(MySQLProperty property) throws Exception {
|
void initTable(MySQLProperty property) throws Exception {
|
||||||
dataSource.getConnection().createStatement().execute(CREATE_TABLE_SQL);
|
|
||||||
|
if (property.autoCreateTable) {
|
||||||
|
|
||||||
|
String createTableSQL = fullSQL(CREATE_TABLE_SQL);
|
||||||
|
|
||||||
|
log.info("[MySqlSeriesDfsService] use create table SQL: {}", createTableSQL);
|
||||||
|
try (Connection connection = dataSource.getConnection()) {
|
||||||
|
connection.createStatement().execute(createTableSQL);
|
||||||
|
log.info("[MySqlSeriesDfsService] auto create table successfully!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String fullSQL(String sql) {
|
||||||
|
return String.format(sql, parseTableName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String parseTableName() {
|
||||||
|
// 误删,兼容本地 unit test
|
||||||
|
if (applicationContext == null) {
|
||||||
|
return DEFAULT_TABLE_NAME;
|
||||||
|
}
|
||||||
|
String tableName = fetchProperty(applicationContext.getEnvironment(), TYPE_MYSQL, KEY_TABLE_NAME);
|
||||||
|
return StringUtils.isEmpty(tableName) ? DEFAULT_TABLE_NAME : tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String whereSQL(FileLocation fileLocation) {
|
||||||
|
return String.format(" where bucket='%s' AND name='%s' ", fileLocation.getBucket(), fileLocation.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -121,16 +310,19 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
|
|||||||
private String url;
|
private String url;
|
||||||
private String username;
|
private String username;
|
||||||
private String password;
|
private String password;
|
||||||
|
|
||||||
|
private boolean autoCreateTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static class MySqlSeriesCondition extends PropertyAndOneBeanCondition {
|
||||||
MySQLProperty mySQLProperty = new MySQLProperty()
|
@Override
|
||||||
.setDriver("com.mysql.cj.jdbc.Driver")
|
protected List<String> anyConfigKey() {
|
||||||
.setUrl("jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
|
return Lists.newArrayList("oms.storage.dfs.mysql_series.url");
|
||||||
.setUsername("root")
|
}
|
||||||
.setPassword("No1Bug2Please3!");
|
|
||||||
MySqlSeriesDfsService mySqlSeriesDfsService = new MySqlSeriesDfsService();
|
@Override
|
||||||
mySqlSeriesDfsService.initDatabase(mySQLProperty);
|
protected Class<?> beanType() {
|
||||||
mySqlSeriesDfsService.initTable(mySQLProperty);
|
return DFsService.class;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import tech.powerjob.server.extension.dfs.*;
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AbstractDfsServiceTest
|
* AbstractDfsServiceTest
|
||||||
@ -32,7 +33,7 @@ public abstract class AbstractDfsServiceTest {
|
|||||||
|
|
||||||
DFsService aliOssService = aliOssServiceOpt.get();
|
DFsService aliOssService = aliOssServiceOpt.get();
|
||||||
|
|
||||||
String content = "wlcgyqsl";
|
String content = "wlcgyqsl".concat(String.valueOf(ThreadLocalRandom.current().nextLong()));
|
||||||
|
|
||||||
String temporarySourcePath = OmsFileUtils.genTemporaryWorkPath() + "source.txt";
|
String temporarySourcePath = OmsFileUtils.genTemporaryWorkPath() + "source.txt";
|
||||||
String temporaryDownloadPath = OmsFileUtils.genTemporaryWorkPath() + "download.txt";
|
String temporaryDownloadPath = OmsFileUtils.genTemporaryWorkPath() + "download.txt";
|
||||||
@ -42,7 +43,7 @@ public abstract class AbstractDfsServiceTest {
|
|||||||
FileUtils.forceMkdirParent(sourceFile);
|
FileUtils.forceMkdirParent(sourceFile);
|
||||||
OmsFileUtils.string2File(content, sourceFile);
|
OmsFileUtils.string2File(content, sourceFile);
|
||||||
|
|
||||||
FileLocation fileLocation = new FileLocation().setBucket("pj_test").setName("testAliOss.txt");
|
FileLocation fileLocation = new FileLocation().setBucket("pj_test").setName(String.format("test_%d.txt", ThreadLocalRandom.current().nextLong()));
|
||||||
|
|
||||||
StoreRequest storeRequest = new StoreRequest()
|
StoreRequest storeRequest = new StoreRequest()
|
||||||
.setFileLocation(fileLocation)
|
.setFileLocation(fileLocation)
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
package tech.powerjob.server.persistence.storage.impl;
|
||||||
|
|
||||||
|
import tech.powerjob.common.utils.NetUtils;
|
||||||
|
import tech.powerjob.server.extension.dfs.DFsService;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MySqlSeriesDfsServiceTest
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2023/8/10
|
||||||
|
*/
|
||||||
|
class MySqlSeriesDfsServiceTest extends AbstractDfsServiceTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Optional<DFsService> fetchService() {
|
||||||
|
|
||||||
|
boolean dbAvailable = NetUtils.checkIpPortAvailable("127.0.0.1", 3306);
|
||||||
|
if (dbAvailable) {
|
||||||
|
MySqlSeriesDfsService mySqlSeriesDfsService = new MySqlSeriesDfsService();
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
MySqlSeriesDfsService.MySQLProperty mySQLProperty = new MySqlSeriesDfsService.MySQLProperty()
|
||||||
|
.setDriver("com.mysql.cj.jdbc.Driver")
|
||||||
|
.setUrl("jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
|
||||||
|
.setUsername("root")
|
||||||
|
.setAutoCreateTable(true)
|
||||||
|
.setPassword("No1Bug2Please3!");
|
||||||
|
mySqlSeriesDfsService.initDatabase(mySQLProperty);
|
||||||
|
mySqlSeriesDfsService.initTable(mySQLProperty);
|
||||||
|
|
||||||
|
return Optional.of(mySqlSeriesDfsService);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user