From b3abb461dba3edd422b8ec73803260bc40da849a Mon Sep 17 00:00:00 2001 From: Jetol <375978142@qq.com> Date: Fri, 8 Mar 2024 17:23:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Postgresql=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E6=89=A9=E5=B1=95=EF=BC=8C=E9=85=8D=E7=BD=AE=E9=A1=B9=E5=8F=82?= =?UTF-8?q?=E8=80=83=EF=BC=9Atech.powerjob.server.persistence.storage.impl?= =?UTF-8?q?.PostgresqlSeriesDfsService?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/extension/dfs/DFsService.java | 3 +- .../storage/StorageConfiguration.java | 6 + .../impl/PostgresqlSeriesDfsService.java | 407 ++++++++++++++++++ .../resources/application-daily.properties | 8 + 4 files changed, 423 insertions(+), 1 deletion(-) create mode 100644 powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/PostgresqlSeriesDfsService.java diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java index fdb77b6b..b14c1c89 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java @@ -1,6 +1,7 @@ package tech.powerjob.server.extension.dfs; import java.io.IOException; +import java.sql.SQLException; import java.util.Optional; /** @@ -16,7 +17,7 @@ public interface DFsService { * @param storeRequest 存储请求 * @throws IOException 异常 */ - void store(StoreRequest storeRequest) throws IOException; + void store(StoreRequest storeRequest) throws IOException, SQLException; /** * 下载文件 diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java index 890fbeba..245cdb73 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java @@ -27,6 +27,12 @@ public class StorageConfiguration { return new MySqlSeriesDfsService(); } + @Bean + @Conditional(PostgresqlSeriesDfsService.PostgresqlSeriesCondition.class) + public DFsService initPGDbFs() { + return new PostgresqlSeriesDfsService(); + } + @Bean @Conditional(AliOssService.AliOssCondition.class) public DFsService initAliOssFs() { diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/PostgresqlSeriesDfsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/PostgresqlSeriesDfsService.java new file mode 100644 index 00000000..4e7f4ebb --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/PostgresqlSeriesDfsService.java @@ -0,0 +1,407 @@ +package tech.powerjob.server.persistence.storage.impl; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.core.env.Environment; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition; +import tech.powerjob.server.extension.dfs.DFsService; +import tech.powerjob.server.extension.dfs.DownloadRequest; +import tech.powerjob.server.extension.dfs.FileLocation; +import tech.powerjob.server.extension.dfs.FileMeta; +import tech.powerjob.server.extension.dfs.StoreRequest; +import tech.powerjob.server.persistence.storage.AbstractDFsService; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Priority; +import javax.sql.DataSource; + +/** + * postgresql 数据库存储,使用的版本是14 + * ********************* 配置项 ********************* + * oms.storage.dfs.postgresql_series.driver + * oms.storage.dfs.postgresql_series.url + * oms.storage.dfs.postgresql_series.username + * oms.storage.dfs.postgresql_series.password + * oms.storage.dfs.postgresql_series.auto_create_table + * oms.storage.dfs.postgresql_series.table_name + * + * @author jetol + * @since 2024-1-8 + */ +@Slf4j +@Priority(value = Integer.MAX_VALUE - 4) +@Conditional(PostgresqlSeriesDfsService.PostgresqlSeriesCondition.class) +public class PostgresqlSeriesDfsService extends AbstractDFsService { + + private DataSource dataSource; + + private static final String TYPE_POSTGRESQL = "postgresql_series"; + + /** + * 数据库驱动,Postgresql 为 org.postgresql.Driver + */ + private static final String KEY_DRIVER_NAME = "driver"; + /** + * 数据库地址,比如 jdbc:postgresql://localhost:3306/powerjob-daily + */ + private static final String KEY_URL = "url"; + /** + * 数据库账号,比如 root + */ + private static final String KEY_USERNAME = "username"; + /** + * 数据库密码 + */ + private static final String KEY_PASSWORD = "password"; + /** + * 是否自动建表 + */ + private static final String KEY_AUTO_CREATE_TABLE = "auto_create_table"; + /** + * 表名 + */ + private static final String KEY_TABLE_NAME = "table_name"; + + /* ********************* SQL region ********************* */ + + private static final String DEFAULT_TABLE_NAME = "powerjob_files"; + + private static final String POWERJOB_FILES_ID_SEQ = "CREATE SEQUENCE powerjob_files_id_seq\n" + + " START WITH 1\n" + + " INCREMENT BY 1\n" + + " NO MINVALUE\n" + + " NO MAXVALUE\n" + + " CACHE 1;" ; + private static final String CREATE_TABLE_SQL = "CREATE TABLE powerjob_files (\n" + + " id bigint NOT NULL DEFAULT nextval('powerjob_files_id_seq') PRIMARY KEY,\n" + + " bucket varchar(255) NOT NULL,\n" + + " name varchar(255) NOT NULL,\n" + + " version varchar(255) NOT NULL,\n" + + " meta varchar(255) NULL DEFAULT NULL,\n" + + " length bigint NOT NULL,\n" + + " status int NOT NULL,\n" + + " data bytea NOT NULL,\n" + + " extra varchar(255) NULL DEFAULT NULL,\n" + + " gmt_create timestamp without time zone NOT NULL,\n" + + " gmt_modified timestamp without time zone NULL DEFAULT NULL\n" + + ");"; + + private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);"; + + private static final String DELETE_SQL = "DELETE FROM %s "; + + private static final String QUERY_FULL_SQL = "select * from %s"; + + private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s"; + + + private void deleteByLocation(FileLocation fileLocation) { + String dSQLPrefix = fullSQL(DELETE_SQL); + String dSQL = dSQLPrefix.concat(whereSQL(fileLocation)); + executeDelete(dSQL); + } + + private void executeDelete(String sql) { + try (Connection con = dataSource.getConnection()) { + con.createStatement().executeUpdate(sql); + } catch (Exception e) { + log.error("[PostgresqlSeriesDfsService] executeDelete failed, sql: {}", sql); + } + } + + @Override + public void store(StoreRequest storeRequest) throws IOException, SQLException { + + Stopwatch sw = Stopwatch.createStarted(); + String insertSQL = fullSQL(INSERT_SQL); + + FileLocation fileLocation = storeRequest.getFileLocation(); + + // 覆盖写,写之前先删除 + deleteByLocation(fileLocation); + + Map meta = Maps.newHashMap(); + meta.put("_server_", serverInfo.getIp()); + meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath()); + BufferedInputStream bufferedInputStream = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())); + + Date date = new Date(System.currentTimeMillis()); + + Connection con =null; + PreparedStatement pst =null; + try { + con = dataSource.getConnection(); + //pg库提示报错:org.postgresql.util.PSQLException: Large Objects may not be used in auto-commit mode. + con.setAutoCommit(false); + log.info("[PostgresqlSeriesDfsService] set autocommit false."); + + pst = con.prepareStatement(insertSQL); + + pst.setString(1, fileLocation.getBucket()); + pst.setString(2, fileLocation.getName()); + pst.setString(3, "mu"); + pst.setString(4, JsonUtils.toJSONString(meta)); + pst.setLong(5, storeRequest.getLocalFile().length()); + pst.setInt(6, SwitchableStatus.ENABLE.getV()); + //PreparedStatement类并没有提供setBlob方法来直接设置BYTEA类型字段,因为PostgreSQL不支持JDBC中的java.sql.Blob接口 +// pst.setBlob(7, bufferedInputStream);org.postgresql.util.PSQLException: ERROR: column "data" is of type bytea but expression is of type bigint + pst.setBytes(7, bufferedInputStreamToByteArray(bufferedInputStream)); + pst.setString(8, null); + pst.setDate(9, date); + pst.setDate(10, date); + + pst.execute(); + con.commit(); + log.info("[PostgresqlSeriesDfsService] store [{}] successfully, cost: {}", fileLocation, sw); + + } catch (Exception e) { + if(con != null){ + con.rollback(); + } + log.error("[PostgresqlSeriesDfsService] store [{}] failed!", fileLocation); + ExceptionUtils.rethrow(e); + }finally { + if(con != null){ + //设置回来,恢复自动提交模式 + con.setAutoCommit(true); + log.info("[PostgresqlSeriesDfsService] set autocommit true."); + con.close(); + } + if(null != pst){ + pst.close(); + } + bufferedInputStream.close(); + } + } + + /** + * 上面已经有异常处理,这里直接往上抛 + * @param bis + * @return + * @throws IOException + */ + public static byte[] bufferedInputStreamToByteArray(BufferedInputStream bis) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + if(null == bis ){ + return null; + } + // 创建缓冲区 + byte[] buffer = new byte[1024]; + int read; + // 读取流中的数据并写入到ByteArrayOutputStream + while ((read = bis.read(buffer)) != -1) { + baos.write(buffer, 0, read); + } + // 关闭输入流 + bis.close(); + // 转换为字节数组并返回 + return baos.toByteArray(); + } + + @Override + public void download(DownloadRequest downloadRequest) throws IOException { + + Stopwatch sw = Stopwatch.createStarted(); + String querySQL = fullSQL(QUERY_FULL_SQL); + + FileLocation fileLocation = downloadRequest.getFileLocation(); + + FileUtils.forceMkdirParent(downloadRequest.getTarget()); + + try (Connection con = dataSource.getConnection()) { + + ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation))); + + boolean exist = resultSet.next(); + + if (!exist) { + log.warn("[PostgresqlSeriesDfsService] download file[{}] failed due to not exits!", fileLocation); + return; + } + + Blob dataBlob = resultSet.getBlob("data"); + FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget()); + + log.info("[PostgresqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw); + + } catch (Exception e) { + log.error("[PostgresqlSeriesDfsService] download file [{}] failed!", fileLocation, e); + ExceptionUtils.rethrow(e); + } + + } + + @Override + public Optional fetchFileMeta(FileLocation fileLocation) { + + String querySQL = fullSQL(QUERY_META_SQL); + + try (Connection con = dataSource.getConnection()) { + + ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation))); + + boolean exist = resultSet.next(); + + if (!exist) { + return Optional.empty(); + } + + FileMeta fileMeta = new FileMeta() + .setLength(resultSet.getLong("length")) + .setLastModifiedTime(resultSet.getDate("gmt_modified")) + .setMetaInfo(JsonUtils.parseMap(resultSet.getString("meta"))); + return Optional.of(fileMeta); + + } catch (Exception e) { + log.error("[PostgresqlSeriesDfsService] fetchFileMeta [{}] failed!", fileLocation); + ExceptionUtils.rethrow(e); + } + + return Optional.empty(); + } + + @Override + public void cleanExpiredFiles(String bucket, int days) { + + // 虽然官方提供了服务端删除的能力,依然强烈建议用户直接在数据库层面配置清理事件!!! + + String dSQLPrefix = fullSQL(DELETE_SQL); + final long targetTs = DateUtils.addDays(new Date(System.currentTimeMillis()), -days).getTime(); + final String targetDeleteTime = CommonUtils.formatTime(targetTs); + log.info("[PostgresqlSeriesDfsService] start to cleanExpiredFiles, targetDeleteTime: {}", targetDeleteTime); + String fSQL = dSQLPrefix.concat(String.format(" where gmt_modified < '%s'", targetDeleteTime)); + log.info("[PostgresqlSeriesDfsService] cleanExpiredFiles SQL: {}", fSQL); + executeDelete(fSQL); + } + + @Override + protected void init(ApplicationContext applicationContext) { + + Environment env = applicationContext.getEnvironment(); + + PostgresqlProperty postgresqlProperty = new PostgresqlProperty() + .setDriver(fetchProperty(env, TYPE_POSTGRESQL, KEY_DRIVER_NAME)) + .setUrl(fetchProperty(env, TYPE_POSTGRESQL, KEY_URL)) + .setUsername(fetchProperty(env, TYPE_POSTGRESQL, KEY_USERNAME)) + .setPassword(fetchProperty(env, TYPE_POSTGRESQL, KEY_PASSWORD)) + .setAutoCreateTable(Boolean.TRUE.toString().equalsIgnoreCase(fetchProperty(env, TYPE_POSTGRESQL, KEY_AUTO_CREATE_TABLE))) + ; + + try { + initDatabase(postgresqlProperty); + initTable(postgresqlProperty); + } catch (Exception e) { + log.error("[PostgresqlSeriesDfsService] init datasource failed!", e); + ExceptionUtils.rethrow(e); + } + + log.info("[PostgresqlSeriesDfsService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER."); + } + + void initDatabase(PostgresqlProperty property) { + + log.info("[PostgresqlSeriesDfsService] init datasource by config: {}", property); + + HikariConfig config = new HikariConfig(); + + config.setDriverClassName(property.driver); + config.setJdbcUrl(property.url); + config.setUsername(property.username); + config.setPassword(property.password); + + config.setAutoCommit(true); + // 池中最小空闲连接数量 + config.setMinimumIdle(2); + // 池中最大连接数量 + config.setMaximumPoolSize(32); + + dataSource = new HikariDataSource(config); + } + + void initTable(PostgresqlProperty property) throws Exception { + + if (property.autoCreateTable) { + + String powerjobFilesIdSeq = fullSQL(POWERJOB_FILES_ID_SEQ); + String createTableSQL = fullSQL(CREATE_TABLE_SQL); + + log.info("[PostgresqlSeriesDfsService] use create table SQL: {}", createTableSQL); + try (Connection connection = dataSource.getConnection()) { + connection.createStatement().execute(powerjobFilesIdSeq); + connection.createStatement().execute(createTableSQL); + log.info("[PostgresqlSeriesDfsService] auto create table successfully!"); + } + } + } + + private String fullSQL(String sql) { + return String.format(sql, parseTableName()); + } + + private String parseTableName() { + // 误删,兼容本地 unit test + if (applicationContext == null) { + return DEFAULT_TABLE_NAME; + } + String tableName = fetchProperty(applicationContext.getEnvironment(), TYPE_POSTGRESQL, KEY_TABLE_NAME); + return StringUtils.isEmpty(tableName) ? DEFAULT_TABLE_NAME : tableName; + } + + private static String whereSQL(FileLocation fileLocation) { + return String.format(" where bucket='%s' AND name='%s' ", fileLocation.getBucket(), fileLocation.getName()); + } + + @Override + public void destroy() throws Exception { + } + + @Data + @Accessors(chain = true) + static class PostgresqlProperty { + private String driver; + private String url; + private String username; + private String password; + + private boolean autoCreateTable; + } + + public static class PostgresqlSeriesCondition extends PropertyAndOneBeanCondition { + @Override + protected List anyConfigKey() { + return Lists.newArrayList("oms.storage.dfs.postgresql_series.url"); + } + + @Override + protected Class beanType() { + return DFsService.class; + } + } +} diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties b/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties index 6722e8b7..a84693d4 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties +++ b/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties @@ -17,6 +17,14 @@ oms.storage.dfs.mysql_series.username=root oms.storage.dfs.mysql_series.password=No1Bug2Please3! oms.storage.dfs.mysql_series.auto_create_table=true +#postgresql +#oms.storage.dfs.postgresql_series.driver=org.postgresql.Driver +#oms.storage.dfs.postgresql_series.url=jdbc:postgresql://127.0.0.15432/powerjob-daily +#oms.storage.dfs.postgresql_series.username=root +#oms.storage.dfs.postgresql_series.password=root +#oms.storage.dfs.postgresql_series.auto_create_table=true + + ####### Email properties(Non-core configuration properties) ####### ####### Delete the following code to disable the mail ####### spring.mail.host=smtp.163.com