diff --git a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java index 085da18c..0480357a 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java @@ -8,9 +8,12 @@ import com.fasterxml.jackson.databind.json.JsonMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import tech.powerjob.common.exception.ImpossibleException; import tech.powerjob.common.exception.PowerJobException; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * JSON工具类 @@ -27,6 +30,8 @@ public class JsonUtils { .configure(JsonParser.Feature.IGNORE_UNDEFINED, true) .build(); + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference> () {}; + private JsonUtils(){ } @@ -67,6 +72,18 @@ public class JsonUtils { return JSON_MAPPER.readValue(json, clz); } + public static Map parseMap(String json) { + if (StringUtils.isEmpty(json)) { + return new HashMap<>(); + } + try { + return JSON_MAPPER.readValue(json, MAP_TYPE_REFERENCE); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + throw new ImpossibleException(); + } + public static T parseObject(byte[] b, Class clz) throws IOException { return JSON_MAPPER.readValue(b, clz); } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java index 9a0d88a3..b6a3fe93 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java @@ -56,6 +56,21 @@ public class NetUtils { return ThreadLocalRandom.current().nextInt(RND_PORT_START, RND_PORT_END); } + /** + * 检测某个 IP 端口是否可用 + * @param ip IP + * @param port 端口 + * @return 是否可用 + */ + public static boolean checkIpPortAvailable(String ip, int port) { + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(ip, port), 1000); + return true; + } catch (Exception e) { + return false; + } + } + /** * 获取本机 IP 地址 * diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java index 13132876..0eda207d 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java @@ -1,6 +1,7 @@ package tech.powerjob.server.extension.dfs; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; import lombok.experimental.Accessors; /** @@ -9,7 +10,8 @@ import lombok.experimental.Accessors; * @author tjq * @since 2023/7/16 */ -@Data +@Getter +@Setter @Accessors(chain = true) public class FileLocation { @@ -22,4 +24,9 @@ public class FileLocation { * 名称 */ private String name; + + @Override + public String toString() { + return String.format("%s.%s", bucket, name); + } } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java index 4ce3c72e..edb52a8c 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java @@ -17,6 +17,8 @@ import tech.powerjob.server.extension.dfs.DFsService; @Slf4j public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean { + protected ApplicationContext applicationContext; + public AbstractDFsService() { log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName()); } @@ -32,6 +34,7 @@ public abstract class AbstractDFsService implements DFsService, ApplicationConte @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName()); init(applicationContext); } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java index 744b79c0..4b3ea930 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java @@ -1,68 +1,228 @@ package tech.powerjob.server.persistence.storage.impl; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Conditional; import org.springframework.core.env.Environment; -import tech.powerjob.server.extension.dfs.DownloadRequest; -import tech.powerjob.server.extension.dfs.FileLocation; -import tech.powerjob.server.extension.dfs.FileMeta; -import tech.powerjob.server.extension.dfs.StoreRequest; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition; +import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.storage.AbstractDFsService; +import javax.annotation.Priority; import javax.sql.DataSource; +import java.io.BufferedInputStream; import java.io.IOException; +import java.nio.file.Files; +import java.sql.*; +import java.util.List; +import java.util.Map; import java.util.Optional; /** * MySQL 特性类似的数据库存储 + * PS1. 大文件上传可能会报 max_allowed_packet 不足,可根据参数放开数据库限制 set global max_allowed_packet = 500*1024*1024 + * PS1. 官方基于 MySQL 测试,其他数据库使用前请自测,敬请谅解! + * PS2. 数据库并不适合大规模的文件存储,该扩展仅适用于简单业务,大型业务场景请选择其他存储方案(OSS、MongoDB等) + * ********************* 配置项 ********************* + * oms.storage.dfs.mysql_series.driver + * oms.storage.dfs.mysql_series.url + * oms.storage.dfs.mysql_series.username + * oms.storage.dfs.mysql_series.password + * oms.storage.dfs.mysql_series.auto_create_table + * oms.storage.dfs.mysql_series.table_name * * @author tjq * @since 2023/8/9 */ @Slf4j +@Priority(value = Integer.MAX_VALUE - 2) +@Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class) public class MySqlSeriesDfsService extends AbstractDFsService { private DataSource dataSource; private static final String TYPE_MYSQL = "mysql_series"; + /** + * 数据库驱动,MYSQL8 为 com.mysql.cj.jdbc.Driver + */ private static final String KEY_DRIVER_NAME = "driver"; + /** + * 数据库地址,比如 jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai + */ private static final String KEY_URL = "url"; + /** + * 数据库账号,比如 root + */ private static final String KEY_USERNAME = "username"; - + /** + * 数据库密码 + */ private static final String KEY_PASSWORD = "password"; + /** + * 是否自动建表 + */ + private static final String KEY_AUTO_CREATE_TABLE = "auto_create_table"; + /** + * 表名 + */ + private static final String KEY_TABLE_NAME = "table_name"; + + /* ********************* SQL region ********************* */ + + private static final String DEFAULT_TABLE_NAME = "powerjob_files"; private static final String CREATE_TABLE_SQL = "CREATE TABLE\n" + "IF\n" + - "\tNOT EXISTS powerjob_files (\n" + + "\tNOT EXISTS %s (\n" + "\t\t`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT 'ID',\n" + + "\t\t`bucket` VARCHAR ( 255 ) NOT NULL COMMENT '分桶',\n" + + "\t\t`name` VARCHAR ( 255 ) NOT NULL COMMENT '文件名称',\n" + + "\t\t`version` VARCHAR ( 255 ) NOT NULL COMMENT '版本',\n" + + "\t\t`meta` VARCHAR ( 255 ) COMMENT '元数据',\n" + + "\t\t`length` BIGINT NOT NULL COMMENT '长度',\n" + + "\t\t`status` INT NOT NULL COMMENT '状态',\n" + + "\t\t`data` LONGBLOB NOT NULL COMMENT '文件内容',\n" + + "\t\t`extra` VARCHAR ( 255 ) COMMENT '其他信息',\n" + "\t\t`gmt_create` DATETIME NOT NULL COMMENT '创建时间',\n" + "\t\t`gmt_modified` DATETIME COMMENT '更新时间',\n" + - "\t\t`name` VARCHAR ( 255 ) NOT NULL COMMENT '文件名称',\n" + - "\t\t`bucket` VARCHAR ( 255 ) NOT NULL COMMENT '分桶',\n" + - "\t\t`extra` VARCHAR ( 255 ) NOT NULL COMMENT '其他信息',\n" + - "\t\t`version` VARCHAR ( 255 ) NOT NULL COMMENT '版本',\n" + - "\t\t`data` LONGBLOB NOT NULL COMMENT '文件内容',\n" + "\tPRIMARY KEY ( id ) \n" + "\t);"; + private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);"; + + private static final String DELETE_SQL = "DELETE FROM %s"; + + private static final String QUERY_FULL_SQL = "select * from %s"; + + private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s"; + + private void deleteByLocation(FileLocation fileLocation) { + String dSQLPrefix = fullSQL(DELETE_SQL); + + try (Connection con = dataSource.getConnection()) { + + String dSQL = dSQLPrefix.concat(whereSQL(fileLocation)); + + con.createStatement().executeUpdate(dSQL); + + } catch (Exception e) { + log.error("[MySqlSeriesDfsService] deleteByLocation [{}] failed!", fileLocation); + } + + } + @Override public void store(StoreRequest storeRequest) throws IOException { + Stopwatch sw = Stopwatch.createStarted(); + String insertSQL = fullSQL(INSERT_SQL); + + FileLocation fileLocation = storeRequest.getFileLocation(); + + // 覆盖写,写之前先删除 + deleteByLocation(fileLocation); + + Map meta = Maps.newHashMap(); + meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath()); + + Date date = new Date(System.currentTimeMillis()); + + try (Connection con = dataSource.getConnection()) { + PreparedStatement pst = con.prepareStatement(insertSQL); + + pst.setString(1, fileLocation.getBucket()); + pst.setString(2, fileLocation.getName()); + pst.setString(3, "mu"); + pst.setString(4, JsonUtils.toJSONString(meta)); + pst.setLong(5, storeRequest.getLocalFile().length()); + pst.setInt(6, SwitchableStatus.ENABLE.getV()); + pst.setBlob(7, new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))); + pst.setString(8, null); + pst.setDate(9, date); + pst.setDate(10, date); + + pst.execute(); + + log.info("[MySqlSeriesDfsService] store [{}] successfully, cost: {}", fileLocation, sw); + + } catch (Exception e) { + log.error("[MySqlSeriesDfsService] store [{}] failed!", fileLocation); + ExceptionUtils.rethrow(e); + } } @Override public void download(DownloadRequest downloadRequest) throws IOException { + Stopwatch sw = Stopwatch.createStarted(); + String querySQL = fullSQL(QUERY_FULL_SQL); + + FileLocation fileLocation = downloadRequest.getFileLocation(); + + FileUtils.forceMkdirParent(downloadRequest.getTarget()); + + try (Connection con = dataSource.getConnection()) { + + ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation))); + + boolean exist = resultSet.next(); + + if (!exist) { + log.warn("[MySqlSeriesDfsService] download file[{}] failed due to not exits!", fileLocation); + return; + } + + Blob dataBlob = resultSet.getBlob("data"); + FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget()); + + log.info("[MySqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw); + + } catch (Exception e) { + log.error("[MySqlSeriesDfsService] download file [{}] failed!", fileLocation, e); + ExceptionUtils.rethrow(e); + } + } @Override public Optional fetchFileMeta(FileLocation fileLocation) throws IOException { + + String querySQL = fullSQL(QUERY_META_SQL); + + try (Connection con = dataSource.getConnection()) { + + ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation))); + + boolean exist = resultSet.next(); + + if (!exist) { + return Optional.empty(); + } + + FileMeta fileMeta = new FileMeta() + .setLength(resultSet.getLong("length")) + .setLastModifiedTime(resultSet.getDate("gmt_modified")) + .setMetaInfo(JsonUtils.parseMap(resultSet.getString("meta"))); + return Optional.of(fileMeta); + + } catch (Exception e) { + log.error("[MySqlSeriesDfsService] fetchFileMeta [{}] failed!", fileLocation); + ExceptionUtils.rethrow(e); + } + return Optional.empty(); } @@ -75,7 +235,9 @@ public class MySqlSeriesDfsService extends AbstractDFsService { .setDriver(fetchProperty(env, TYPE_MYSQL, KEY_DRIVER_NAME)) .setUrl(fetchProperty(env, TYPE_MYSQL, KEY_URL)) .setUsername(fetchProperty(env, TYPE_MYSQL, KEY_USERNAME)) - .setPassword(fetchProperty(env, TYPE_MYSQL, KEY_PASSWORD)); + .setPassword(fetchProperty(env, TYPE_MYSQL, KEY_PASSWORD)) + .setAutoCreateTable(Boolean.TRUE.toString().equalsIgnoreCase(fetchProperty(env, TYPE_MYSQL, KEY_AUTO_CREATE_TABLE))) + ; try { initDatabase(mySQLProperty); @@ -86,7 +248,7 @@ public class MySqlSeriesDfsService extends AbstractDFsService { } } - private void initDatabase(MySQLProperty property) { + void initDatabase(MySQLProperty property) { log.info("[MySqlSeriesDfsService] init datasource by config: {}", property); @@ -106,8 +268,35 @@ public class MySqlSeriesDfsService extends AbstractDFsService { dataSource = new HikariDataSource(config); } - private void initTable(MySQLProperty property) throws Exception { - dataSource.getConnection().createStatement().execute(CREATE_TABLE_SQL); + void initTable(MySQLProperty property) throws Exception { + + if (property.autoCreateTable) { + + String createTableSQL = fullSQL(CREATE_TABLE_SQL); + + log.info("[MySqlSeriesDfsService] use create table SQL: {}", createTableSQL); + try (Connection connection = dataSource.getConnection()) { + connection.createStatement().execute(createTableSQL); + log.info("[MySqlSeriesDfsService] auto create table successfully!"); + } + } + } + + private String fullSQL(String sql) { + return String.format(sql, parseTableName()); + } + + private String parseTableName() { + // 误删,兼容本地 unit test + if (applicationContext == null) { + return DEFAULT_TABLE_NAME; + } + String tableName = fetchProperty(applicationContext.getEnvironment(), TYPE_MYSQL, KEY_TABLE_NAME); + return StringUtils.isEmpty(tableName) ? DEFAULT_TABLE_NAME : tableName; + } + + private static String whereSQL(FileLocation fileLocation) { + return String.format(" where bucket='%s' AND name='%s' ", fileLocation.getBucket(), fileLocation.getName()); } @Override @@ -121,16 +310,19 @@ public class MySqlSeriesDfsService extends AbstractDFsService { private String url; private String username; private String password; + + private boolean autoCreateTable; } - public static void main(String[] args) throws Exception { - MySQLProperty mySQLProperty = new MySQLProperty() - .setDriver("com.mysql.cj.jdbc.Driver") - .setUrl("jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai") - .setUsername("root") - .setPassword("No1Bug2Please3!"); - MySqlSeriesDfsService mySqlSeriesDfsService = new MySqlSeriesDfsService(); - mySqlSeriesDfsService.initDatabase(mySQLProperty); - mySqlSeriesDfsService.initTable(mySQLProperty); + public static class MySqlSeriesCondition extends PropertyAndOneBeanCondition { + @Override + protected List anyConfigKey() { + return Lists.newArrayList("oms.storage.dfs.mysql_series.url"); + } + + @Override + protected Class beanType() { + return DFsService.class; + } } } diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java index bd15b482..8b5a4dc9 100644 --- a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java +++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java @@ -10,6 +10,7 @@ import tech.powerjob.server.extension.dfs.*; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; /** * AbstractDfsServiceTest @@ -32,7 +33,7 @@ public abstract class AbstractDfsServiceTest { DFsService aliOssService = aliOssServiceOpt.get(); - String content = "wlcgyqsl"; + String content = "wlcgyqsl".concat(String.valueOf(ThreadLocalRandom.current().nextLong())); String temporarySourcePath = OmsFileUtils.genTemporaryWorkPath() + "source.txt"; String temporaryDownloadPath = OmsFileUtils.genTemporaryWorkPath() + "download.txt"; @@ -42,7 +43,7 @@ public abstract class AbstractDfsServiceTest { FileUtils.forceMkdirParent(sourceFile); OmsFileUtils.string2File(content, sourceFile); - FileLocation fileLocation = new FileLocation().setBucket("pj_test").setName("testAliOss.txt"); + FileLocation fileLocation = new FileLocation().setBucket("pj_test").setName(String.format("test_%d.txt", ThreadLocalRandom.current().nextLong())); StoreRequest storeRequest = new StoreRequest() .setFileLocation(fileLocation) diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsServiceTest.java new file mode 100644 index 00000000..d14116fe --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsServiceTest.java @@ -0,0 +1,42 @@ +package tech.powerjob.server.persistence.storage.impl; + +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.server.extension.dfs.DFsService; + +import java.util.Optional; + +/** + * MySqlSeriesDfsServiceTest + * + * @author tjq + * @since 2023/8/10 + */ +class MySqlSeriesDfsServiceTest extends AbstractDfsServiceTest { + + @Override + protected Optional fetchService() { + + boolean dbAvailable = NetUtils.checkIpPortAvailable("127.0.0.1", 3306); + if (dbAvailable) { + MySqlSeriesDfsService mySqlSeriesDfsService = new MySqlSeriesDfsService(); + + try { + + MySqlSeriesDfsService.MySQLProperty mySQLProperty = new MySqlSeriesDfsService.MySQLProperty() + .setDriver("com.mysql.cj.jdbc.Driver") + .setUrl("jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai") + .setUsername("root") + .setAutoCreateTable(true) + .setPassword("No1Bug2Please3!"); + mySqlSeriesDfsService.initDatabase(mySQLProperty); + mySqlSeriesDfsService.initTable(mySQLProperty); + + return Optional.of(mySqlSeriesDfsService); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return Optional.empty(); + } +} \ No newline at end of file