mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[opt] change @Transactional to TransactionTemplate
This commit is contained in:
parent
b35dc15194
commit
d546b2f814
@ -1,5 +1,6 @@
|
|||||||
package com.github.kfcfans.oms.server.persistence.config;
|
package com.github.kfcfans.oms.server.persistence.config;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
|
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
|
||||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
|
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
|
||||||
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
|
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
|
||||||
@ -10,7 +11,9 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
|||||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||||
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
@ -68,4 +71,12 @@ public class LocalJpaConfig {
|
|||||||
public PlatformTransactionManager initLocalTransactionManager(EntityManagerFactoryBuilder builder) {
|
public PlatformTransactionManager initLocalTransactionManager(EntityManagerFactoryBuilder builder) {
|
||||||
return new JpaTransactionManager(Objects.requireNonNull(initLocalEntityManagerFactory(builder).getObject()));
|
return new JpaTransactionManager(Objects.requireNonNull(initLocalEntityManagerFactory(builder).getObject()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean(name = "localTransactionTemplate")
|
||||||
|
public TransactionTemplate initTransactionTemplate(@Qualifier("localTransactionManager") PlatformTransactionManager ptm) {
|
||||||
|
TransactionTemplate tt = new TransactionTemplate(ptm);
|
||||||
|
// 设置隔离级别
|
||||||
|
tt.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT);
|
||||||
|
return tt;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ import javax.sql.DataSource;
|
|||||||
@Configuration
|
@Configuration
|
||||||
public class MultiDatasourceConfig {
|
public class MultiDatasourceConfig {
|
||||||
|
|
||||||
private static final String H2_JDBC_URL = "jdbc:h2:file:~/oms/h2/oms_server_db";
|
private static final String H2_JDBC_URL = "jdbc:h2:file:~/oms-server/h2/oms_server_db";
|
||||||
|
|
||||||
@Primary
|
@Primary
|
||||||
@Bean("omsCoreDatasource")
|
@Bean("omsCoreDatasource")
|
||||||
|
@ -23,10 +23,10 @@ import org.springframework.data.mongodb.gridfs.GridFsTemplate;
|
|||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.util.FileCopyUtils;
|
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -46,6 +46,11 @@ public class InstanceLogService {
|
|||||||
|
|
||||||
// 直接操作 mongoDB 文件系统
|
// 直接操作 mongoDB 文件系统
|
||||||
private GridFsTemplate gridFsTemplate;
|
private GridFsTemplate gridFsTemplate;
|
||||||
|
|
||||||
|
// 本地数据库操作bean
|
||||||
|
@Resource(name = "localTransactionTemplate")
|
||||||
|
private TransactionTemplate localTransactionTemplate;
|
||||||
|
@Resource
|
||||||
private LocalInstanceLogRepository localInstanceLogRepository;
|
private LocalInstanceLogRepository localInstanceLogRepository;
|
||||||
|
|
||||||
// 本地维护了在线日志的任务实例ID
|
// 本地维护了在线日志的任务实例ID
|
||||||
@ -95,7 +100,6 @@ public class InstanceLogService {
|
|||||||
* @param index 页码
|
* @param index 页码
|
||||||
* @return 文本字符串
|
* @return 文本字符串
|
||||||
*/
|
*/
|
||||||
@Transactional
|
|
||||||
public StringPage fetchInstanceLog(Long instanceId, long index) {
|
public StringPage fetchInstanceLog(Long instanceId, long index) {
|
||||||
try {
|
try {
|
||||||
Future<File> fileFuture = prepareLogFile(instanceId);
|
Future<File> fileFuture = prepareLogFile(instanceId);
|
||||||
@ -151,7 +155,6 @@ public class InstanceLogService {
|
|||||||
* 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行
|
* 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行
|
||||||
* @param instanceId 任务实例ID
|
* @param instanceId 任务实例ID
|
||||||
*/
|
*/
|
||||||
@Transactional
|
|
||||||
@Async("omsCommonPool")
|
@Async("omsCommonPool")
|
||||||
public void sync(Long instanceId) {
|
public void sync(Long instanceId) {
|
||||||
|
|
||||||
@ -191,52 +194,58 @@ public class InstanceLogService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private File genTemporaryLogFile(long instanceId) throws IOException {
|
private File genTemporaryLogFile(long instanceId) {
|
||||||
String path = genLogFilePath(instanceId, false);
|
String path = genLogFilePath(instanceId, false);
|
||||||
synchronized (("tpFileLock-" + instanceId).intern()) {
|
synchronized (("tpFileLock-" + instanceId).intern()) {
|
||||||
File f = new File(path);
|
|
||||||
// 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回)
|
|
||||||
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) {
|
|
||||||
return f;
|
|
||||||
}
|
|
||||||
// 重新构建文件
|
|
||||||
try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
|
|
||||||
stream2File(allLogStream, f);
|
|
||||||
}
|
|
||||||
return f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private File genStableLogFile(long instanceId) throws IOException {
|
// Stream 需要在事务的包裹之下使用
|
||||||
String path = genLogFilePath(instanceId, true);
|
return localTransactionTemplate.execute(status -> {
|
||||||
synchronized (("stFileLock-" + instanceId).intern()) {
|
File f = new File(path);
|
||||||
File f = new File(path);
|
// 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回)
|
||||||
if (f.exists()) {
|
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) {
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
// 重新构建文件
|
||||||
// 本地存在数据,从本地持久化(对应 SYNC 的情况)
|
|
||||||
if (instanceId2LastReportTime.containsKey(instanceId)) {
|
|
||||||
try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
|
try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
|
||||||
stream2File(allLogStream, f);
|
stream2File(allLogStream, f);
|
||||||
}
|
}
|
||||||
}else {
|
return f;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (gridFsTemplate == null) {
|
private File genStableLogFile(long instanceId) {
|
||||||
FileCopyUtils.copy("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.".getBytes(), f);
|
String path = genLogFilePath(instanceId, true);
|
||||||
|
synchronized (("stFileLock-" + instanceId).intern()) {
|
||||||
|
return localTransactionTemplate.execute(status -> {
|
||||||
|
File f = new File(path);
|
||||||
|
if (f.exists()) {
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 否则从 mongoDB 拉取数据(对应后期查询的情况)
|
// 本地存在数据,从本地持久化(对应 SYNC 的情况)
|
||||||
GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId));
|
if (instanceId2LastReportTime.containsKey(instanceId)) {
|
||||||
|
try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
|
||||||
|
stream2File(allLogStream, f);
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
|
||||||
if (!gridFsResource.exists()) {
|
if (gridFsTemplate == null) {
|
||||||
FileCopyUtils.copy("SYSTEM: There is no online log for this job instance.".getBytes(), f);
|
string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f);
|
||||||
return f;
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 否则从 mongoDB 拉取数据(对应后期查询的情况)
|
||||||
|
GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId));
|
||||||
|
|
||||||
|
if (!gridFsResource.exists()) {
|
||||||
|
string2File("SYSTEM: There is no online log for this job instance.", f);
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
gridFs2File(gridFsResource, f);
|
||||||
}
|
}
|
||||||
gridFs2File(gridFsResource, f);
|
return f;
|
||||||
}
|
});
|
||||||
return f;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,7 +254,7 @@ public class InstanceLogService {
|
|||||||
* @param stream 流
|
* @param stream 流
|
||||||
* @param logFile 目标日志文件
|
* @param logFile 目标日志文件
|
||||||
*/
|
*/
|
||||||
private void stream2File(Stream<LocalInstanceLogDO> stream, File logFile) throws IOException {
|
private void stream2File(Stream<LocalInstanceLogDO> stream, File logFile) {
|
||||||
if (!logFile.getParentFile().exists()) {
|
if (!logFile.getParentFile().exists()) {
|
||||||
if (!logFile.getParentFile().mkdirs()) {
|
if (!logFile.getParentFile().mkdirs()) {
|
||||||
log.warn("[InstanceLogService] create dir for instanceLog failed, path is {}.", logFile.getPath());
|
log.warn("[InstanceLogService] create dir for instanceLog failed, path is {}.", logFile.getPath());
|
||||||
@ -259,6 +268,8 @@ public class InstanceLogService {
|
|||||||
}catch (Exception ignore) {
|
}catch (Exception ignore) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}catch (IOException ie) {
|
||||||
|
ExceptionUtils.rethrow(ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,7 +278,7 @@ public class InstanceLogService {
|
|||||||
* @param gridFsResource mongoDB 文件资源
|
* @param gridFsResource mongoDB 文件资源
|
||||||
* @param logFile 本地文件资源
|
* @param logFile 本地文件资源
|
||||||
*/
|
*/
|
||||||
private void gridFs2File(GridFsResource gridFsResource, File logFile) throws IOException {
|
private void gridFs2File(GridFsResource gridFsResource, File logFile) {
|
||||||
byte[] buffer = new byte[1024];
|
byte[] buffer = new byte[1024];
|
||||||
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
|
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
|
||||||
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
|
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
|
||||||
@ -276,6 +287,16 @@ public class InstanceLogService {
|
|||||||
bos.write(buffer);
|
bos.write(buffer);
|
||||||
}
|
}
|
||||||
bos.flush();
|
bos.flush();
|
||||||
|
}catch (IOException ie) {
|
||||||
|
ExceptionUtils.rethrow(ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void string2File(String content, File logFile) {
|
||||||
|
try(FileWriter fw = new FileWriter(logFile)) {
|
||||||
|
fw.write(content);
|
||||||
|
}catch (IOException ie) {
|
||||||
|
ExceptionUtils.rethrow(ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -325,9 +346,9 @@ public class InstanceLogService {
|
|||||||
|
|
||||||
private static String genLogFilePath(long instanceId, boolean stable) {
|
private static String genLogFilePath(long instanceId, boolean stable) {
|
||||||
if (stable) {
|
if (stable) {
|
||||||
return USER_HOME + "/oms/online_log/" + String.format("%d-stable.log", instanceId);
|
return USER_HOME + "/oms-server/online_log/" + String.format("%d-stable.log", instanceId);
|
||||||
}else {
|
}else {
|
||||||
return USER_HOME + "/oms/online_log/" + String.format("%d-temporary.log", instanceId);
|
return USER_HOME + "/oms-server/online_log/" + String.format("%d-temporary.log", instanceId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private static String genMongoFileName(long instanceId) {
|
private static String genMongoFileName(long instanceId) {
|
||||||
@ -338,8 +359,4 @@ public class InstanceLogService {
|
|||||||
public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
|
public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
|
||||||
this.gridFsTemplate = gridFsTemplate;
|
this.gridFsTemplate = gridFsTemplate;
|
||||||
}
|
}
|
||||||
@Autowired
|
|
||||||
public void setLocalInstanceLogRepository(LocalInstanceLogRepository localInstanceLogRepository) {
|
|
||||||
this.localInstanceLogRepository = localInstanceLogRepository;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ public class OmsLogPerformanceTester implements BasicProcessor {
|
|||||||
for (long i = 0; i < times; i++) {
|
for (long i = 0; i < times; i++) {
|
||||||
for (long j = 0; j < BATCH; j++) {
|
for (long j = 0; j < BATCH; j++) {
|
||||||
long index = i * BATCH + j;
|
long index = i * BATCH + j;
|
||||||
|
System.out.println("send index: " + index);
|
||||||
logger.info("[OmsLogPerformanceTester] testing omsLogger performance, current index is {}.", index);
|
logger.info("[OmsLogPerformanceTester] testing omsLogger performance, current index is {}.", index);
|
||||||
}
|
}
|
||||||
logger.error("[OmsLogPerformanceTester] Oh, we have an exception to log~", re);
|
logger.error("[OmsLogPerformanceTester] Oh, we have an exception to log~", re);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user