fix: PostgresqlSeriesDfsService can't restartup #974

This commit is contained in:
tjq 2024-10-26 01:16:25 +08:00
parent 827bcd2502
commit 243f7bb179

View File

@ -15,32 +15,24 @@ import org.apache.commons.lang3.time.DateUtils;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition; import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import tech.powerjob.server.extension.dfs.DFsService; import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.extension.dfs.DownloadRequest;
import tech.powerjob.server.extension.dfs.FileLocation;
import tech.powerjob.server.extension.dfs.FileMeta;
import tech.powerjob.server.extension.dfs.StoreRequest;
import tech.powerjob.server.persistence.storage.AbstractDFsService; import tech.powerjob.server.persistence.storage.AbstractDFsService;
import javax.annotation.Priority;
import javax.sql.DataSource;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.sql.Blob; import java.sql.*;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import javax.annotation.Priority;
import javax.sql.DataSource;
/** /**
* postgresql 数据库存储使用的版本是14 * postgresql 数据库存储使用的版本是14
@ -93,13 +85,13 @@ public class PostgresqlSeriesDfsService extends AbstractDFsService {
private static final String DEFAULT_TABLE_NAME = "powerjob_files"; private static final String DEFAULT_TABLE_NAME = "powerjob_files";
private static final String POWERJOB_FILES_ID_SEQ = "CREATE SEQUENCE powerjob_files_id_seq\n" + private static final String POWERJOB_FILES_ID_SEQ = "CREATE SEQUENCE if not exists powerjob_files_id_seq\n" +
" START WITH 1\n" + " START WITH 1\n" +
" INCREMENT BY 1\n" + " INCREMENT BY 1\n" +
" NO MINVALUE\n" + " NO MINVALUE\n" +
" NO MAXVALUE\n" + " NO MAXVALUE\n" +
" CACHE 1;" ; " CACHE 1;" ;
private static final String CREATE_TABLE_SQL = "CREATE TABLE powerjob_files (\n" + private static final String CREATE_TABLE_SQL = "CREATE TABLE if not exists powerjob_files (\n" +
" id bigint NOT NULL DEFAULT nextval('powerjob_files_id_seq') PRIMARY KEY,\n" + " id bigint NOT NULL DEFAULT nextval('powerjob_files_id_seq') PRIMARY KEY,\n" +
" bucket varchar(255) NOT NULL,\n" + " bucket varchar(255) NOT NULL,\n" +
" name varchar(255) NOT NULL,\n" + " name varchar(255) NOT NULL,\n" +
@ -160,7 +152,6 @@ public class PostgresqlSeriesDfsService extends AbstractDFsService {
con = dataSource.getConnection(); con = dataSource.getConnection();
//pg库提示报错org.postgresql.util.PSQLException: Large Objects may not be used in auto-commit mode. //pg库提示报错org.postgresql.util.PSQLException: Large Objects may not be used in auto-commit mode.
con.setAutoCommit(false); con.setAutoCommit(false);
log.info("[PostgresqlSeriesDfsService] set autocommit false.");
pst = con.prepareStatement(insertSQL); pst = con.prepareStatement(insertSQL);
@ -185,13 +176,12 @@ public class PostgresqlSeriesDfsService extends AbstractDFsService {
if(con != null){ if(con != null){
con.rollback(); con.rollback();
} }
log.error("[PostgresqlSeriesDfsService] store [{}] failed!", fileLocation); log.error("[PostgresqlSeriesDfsService] store [{}] failed!", fileLocation, e);
ExceptionUtils.rethrow(e); ExceptionUtils.rethrow(e);
}finally { }finally {
if(con != null){ if(con != null){
//设置回来恢复自动提交模式 //设置回来恢复自动提交模式
con.setAutoCommit(true); con.setAutoCommit(true);
log.info("[PostgresqlSeriesDfsService] set autocommit true.");
con.close(); con.close();
} }
if(null != pst){ if(null != pst){
@ -246,8 +236,17 @@ public class PostgresqlSeriesDfsService extends AbstractDFsService {
return; return;
} }
// PostgreSQL bytea 类型的数据并不直接映射为 JDBC Blob 类型相反bytea 数据应当被处理为字节数组 (byte[]) 而不是 Blob 对象
try {
byte[] dataBytes = resultSet.getBytes("data");
try (FileOutputStream fos = new FileOutputStream(downloadRequest.getTarget())) {
fos.write(dataBytes);
}
} catch (Exception ignore) {
// 测试发现会报错 报错不良的类型值 long但并未有用户反馈问题暂时保留老写法可能是不同DB获取方式不同
Blob dataBlob = resultSet.getBlob("data"); Blob dataBlob = resultSet.getBlob("data");
FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget()); FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget());
}
log.info("[PostgresqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw); log.info("[PostgresqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw);
@ -331,7 +330,7 @@ public class PostgresqlSeriesDfsService extends AbstractDFsService {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
config.setDriverClassName(property.driver); config.setDriverClassName(StringUtils.isEmpty(property.driver) ? "org.postgresql.Driver" : property.driver);
config.setJdbcUrl(property.url); config.setJdbcUrl(property.url);
config.setUsername(property.username); config.setUsername(property.username);
config.setPassword(property.password); config.setPassword(property.password);