From d546b2f814b2d0404318784b0622fe3a56da4ffb Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 7 May 2020 16:33:34 +0800 Subject: [PATCH] [opt] change @Transactional to TransactionTemplate --- .../persistence/config/LocalJpaConfig.java | 11 ++ .../config/MultiDatasourceConfig.java | 2 +- .../server/service/InstanceLogService.java | 109 ++++++++++-------- .../tester/OmsLogPerformanceTester.java | 1 + 4 files changed, 76 insertions(+), 47 deletions(-) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java index 397b4966..81e2383b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.persistence.config; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties; import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings; import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties; @@ -10,7 +11,9 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.Resource; import javax.sql.DataSource; @@ -68,4 +71,12 @@ public class LocalJpaConfig { public PlatformTransactionManager initLocalTransactionManager(EntityManagerFactoryBuilder builder) { return new JpaTransactionManager(Objects.requireNonNull(initLocalEntityManagerFactory(builder).getObject())); } + + @Bean(name = "localTransactionTemplate") + public TransactionTemplate initTransactionTemplate(@Qualifier("localTransactionManager") PlatformTransactionManager ptm) { + TransactionTemplate tt = new TransactionTemplate(ptm); + // 设置隔离级别 + tt.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT); + return tt; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/MultiDatasourceConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/MultiDatasourceConfig.java index 09fd07c4..f479a0f2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/MultiDatasourceConfig.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/MultiDatasourceConfig.java @@ -20,7 +20,7 @@ import javax.sql.DataSource; @Configuration public class MultiDatasourceConfig { - private static final String H2_JDBC_URL = "jdbc:h2:file:~/oms/h2/oms_server_db"; + private static final String H2_JDBC_URL = "jdbc:h2:file:~/oms-server/h2/oms_server_db"; @Primary @Bean("omsCoreDatasource") diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index 0d32126f..8cddda28 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -23,10 +23,10 @@ import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; -import org.springframework.util.FileCopyUtils; +import javax.annotation.Resource; import java.io.*; import java.util.List; import java.util.Map; @@ -46,6 +46,11 @@ public class InstanceLogService { // 直接操作 mongoDB 文件系统 private GridFsTemplate gridFsTemplate; + + // 本地数据库操作bean + @Resource(name = "localTransactionTemplate") + private TransactionTemplate localTransactionTemplate; + @Resource private LocalInstanceLogRepository localInstanceLogRepository; // 本地维护了在线日志的任务实例ID @@ -95,7 +100,6 @@ public class InstanceLogService { * @param index 页码 * @return 文本字符串 */ - @Transactional public StringPage fetchInstanceLog(Long instanceId, long index) { try { Future fileFuture = prepareLogFile(instanceId); @@ -151,7 +155,6 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Transactional @Async("omsCommonPool") public void sync(Long instanceId) { @@ -191,52 +194,58 @@ public class InstanceLogService { } } - private File genTemporaryLogFile(long instanceId) throws IOException { + private File genTemporaryLogFile(long instanceId) { String path = genLogFilePath(instanceId, false); synchronized (("tpFileLock-" + instanceId).intern()) { - File f = new File(path); - // 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回) - if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) { - return f; - } - // 重新构建文件 - try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { - stream2File(allLogStream, f); - } - return f; - } - } - private File genStableLogFile(long instanceId) throws IOException { - String path = genLogFilePath(instanceId, true); - synchronized (("stFileLock-" + instanceId).intern()) { - File f = new File(path); - if (f.exists()) { - return f; - } - - // 本地存在数据,从本地持久化(对应 SYNC 的情况) - if (instanceId2LastReportTime.containsKey(instanceId)) { + // Stream 需要在事务的包裹之下使用 + return localTransactionTemplate.execute(status -> { + File f = new File(path); + // 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回) + if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) { + return f; + } + // 重新构建文件 try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { stream2File(allLogStream, f); } - }else { + return f; + }); + } + } - if (gridFsTemplate == null) { - FileCopyUtils.copy("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.".getBytes(), f); + private File genStableLogFile(long instanceId) { + String path = genLogFilePath(instanceId, true); + synchronized (("stFileLock-" + instanceId).intern()) { + return localTransactionTemplate.execute(status -> { + File f = new File(path); + if (f.exists()) { return f; } - // 否则从 mongoDB 拉取数据(对应后期查询的情况) - GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId)); + // 本地存在数据,从本地持久化(对应 SYNC 的情况) + if (instanceId2LastReportTime.containsKey(instanceId)) { + try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { + stream2File(allLogStream, f); + } + }else { - if (!gridFsResource.exists()) { - FileCopyUtils.copy("SYSTEM: There is no online log for this job instance.".getBytes(), f); - return f; + if (gridFsTemplate == null) { + string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f); + return f; + } + + // 否则从 mongoDB 拉取数据(对应后期查询的情况) + GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId)); + + if (!gridFsResource.exists()) { + string2File("SYSTEM: There is no online log for this job instance.", f); + return f; + } + gridFs2File(gridFsResource, f); } - gridFs2File(gridFsResource, f); - } - return f; + return f; + }); } } @@ -245,7 +254,7 @@ public class InstanceLogService { * @param stream 流 * @param logFile 目标日志文件 */ - private void stream2File(Stream stream, File logFile) throws IOException { + private void stream2File(Stream stream, File logFile) { if (!logFile.getParentFile().exists()) { if (!logFile.getParentFile().mkdirs()) { log.warn("[InstanceLogService] create dir for instanceLog failed, path is {}.", logFile.getPath()); @@ -259,6 +268,8 @@ public class InstanceLogService { }catch (Exception ignore) { } }); + }catch (IOException ie) { + ExceptionUtils.rethrow(ie); } } @@ -267,7 +278,7 @@ public class InstanceLogService { * @param gridFsResource mongoDB 文件资源 * @param logFile 本地文件资源 */ - private void gridFs2File(GridFsResource gridFsResource, File logFile) throws IOException { + private void gridFs2File(GridFsResource gridFsResource, File logFile) { byte[] buffer = new byte[1024]; try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile)) @@ -276,6 +287,16 @@ public class InstanceLogService { bos.write(buffer); } bos.flush(); + }catch (IOException ie) { + ExceptionUtils.rethrow(ie); + } + } + + private void string2File(String content, File logFile) { + try(FileWriter fw = new FileWriter(logFile)) { + fw.write(content); + }catch (IOException ie) { + ExceptionUtils.rethrow(ie); } } @@ -325,9 +346,9 @@ public class InstanceLogService { private static String genLogFilePath(long instanceId, boolean stable) { if (stable) { - return USER_HOME + "/oms/online_log/" + String.format("%d-stable.log", instanceId); + return USER_HOME + "/oms-server/online_log/" + String.format("%d-stable.log", instanceId); }else { - return USER_HOME + "/oms/online_log/" + String.format("%d-temporary.log", instanceId); + return USER_HOME + "/oms-server/online_log/" + String.format("%d-temporary.log", instanceId); } } private static String genMongoFileName(long instanceId) { @@ -338,8 +359,4 @@ public class InstanceLogService { public void setGridFsTemplate(GridFsTemplate gridFsTemplate) { this.gridFsTemplate = gridFsTemplate; } - @Autowired - public void setLocalInstanceLogRepository(LocalInstanceLogRepository localInstanceLogRepository) { - this.localInstanceLogRepository = localInstanceLogRepository; - } } diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java index cb46756c..0f5fefa3 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java @@ -31,6 +31,7 @@ public class OmsLogPerformanceTester implements BasicProcessor { for (long i = 0; i < times; i++) { for (long j = 0; j < BATCH; j++) { long index = i * BATCH + j; + System.out.println("send index: " + index); logger.info("[OmsLogPerformanceTester] testing omsLogger performance, current index is {}.", index); } logger.error("[OmsLogPerformanceTester] Oh, we have an exception to log~", re);