From b35dc151945bc2c159143aa76d3b2c7b1b40bb3c Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 7 May 2020 13:02:32 +0800 Subject: [PATCH] opt omsOnlineLog --- .../server/service/InstanceLogService.java | 250 +++++++++--------- 1 file changed, 122 insertions(+), 128 deletions(-) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index dd5075a3..0d32126f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -11,9 +11,10 @@ import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Queues; -import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -24,15 +25,12 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; +import org.springframework.util.FileCopyUtils; import java.io.*; import java.util.List; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Map; +import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -51,11 +49,8 @@ public class InstanceLogService { private LocalInstanceLogRepository localInstanceLogRepository; // 本地维护了在线日志的任务实例ID - private final Set instanceIds = Sets.newConcurrentHashSet(); - // 锁(可重入锁也太坑了吧,需要考虑同一个线程重复下载的问题 -> 因为下载交给了额外的线程去做...) - private final int lockNum; - private final Lock[] locks; - private final Executor workerPool; + private final Map instanceId2LastReportTime = Maps.newConcurrentMap(); + private final ExecutorService workerPool; // 格式化时间戳 private static final FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS"); @@ -65,17 +60,10 @@ public class InstanceLogService { private static final int MAX_LINE_COUNT = 500; // 过期时间 private static final long EXPIRE_INTERVAL_MS = 60000; - // 小文件阈值(2M) - private static final int SMALL_FILE_MAX_SIZE = 2 * 1024 * 1024; public InstanceLogService() { - lockNum = Runtime.getRuntime().availableProcessors(); - locks = new ReentrantLock[lockNum]; - for (int i = 0; i < lockNum; i++) { - locks[i] = new ReentrantLock(); - } - - workerPool = new ThreadPoolExecutor(lockNum, lockNum, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue()); + int coreSize = Runtime.getRuntime().availableProcessors(); + workerPool = new ThreadPoolExecutor(coreSize, coreSize, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue()); } /** @@ -86,7 +74,7 @@ public class InstanceLogService { public void submitLogs(String workerAddress, List logs) { List logList = logs.stream().map(x -> { - instanceIds.add(x.getInstanceId()); + instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis()); LocalInstanceLogDO y = new LocalInstanceLogDO(); BeanUtils.copyProperties(x, y); @@ -107,59 +95,11 @@ public class InstanceLogService { * @param index 页码 * @return 文本字符串 */ - @Transactional(readOnly = true) + @Transactional public StringPage fetchInstanceLog(Long instanceId, long index) { - - File logFile = new File(genLogFilePath(instanceId)); - Lock lock = locks[(int) (instanceId % lockNum)]; - - lock.lock(); try { - long logCount = localInstanceLogRepository.countByInstanceId(instanceId); - - // 直接从本地数据库构建日志文件 - if (logCount != 0) { - - // 存在则判断上次更改时间,1分钟内有效 - if (logFile.exists()) { - long offset = System.currentTimeMillis() - logFile.lastModified(); - // 过期才选择重新构建文件 - if (offset > EXPIRE_INTERVAL_MS) { - Stream logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); - - // 这里直接用 Controller 线程执行,毕竟本地持久化用不了多少时间,因此不用考虑可重入锁的问题 - stream2File(logStream, instanceId); - } - } - - }else { - - if (gridFsTemplate == null) { - return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the past logs."); - } - - // 不存在,需要重新下载 - if (!logFile.exists()) { - GridFsResource gridFsResource = gridFsTemplate.getResource(genLogFileName(instanceId)); - - if (!gridFsResource.exists()) { - return StringPage.simple("There is no online log for this job instance."); - } - - long targetFileSize = gridFsResource.contentLength(); - // 小文件 Controller 线程直接执行,大文件则异步下载,先返回 downloading... - if (targetFileSize <= SMALL_FILE_MAX_SIZE) { - gridFs2File(gridFsResource, logFile); - }else { - workerPool.execute(() -> gridFs2File(gridFsResource, logFile)); - return StringPage.simple("downloading from mongoDB, please retry some time later~"); - } - } - } - - if (!logFile.exists()) { - return StringPage.simple("There is no online log for this task instance now."); - } + Future fileFuture = prepareLogFile(instanceId); + File logFile = fileFuture.get(5, TimeUnit.SECONDS); // 分页展示数据 long lines = 0; @@ -178,18 +118,35 @@ public class InstanceLogService { } }catch (Exception e) { log.warn("[InstanceLogService] read logFile from disk failed.", e); + return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); } double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT); return new StringPage(index, (long) totalPage, sb.toString()); + + }catch (TimeoutException te) { + return StringPage.simple("log file is being prepared, please try again later."); }catch (Exception e) { - log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e); - return StringPage.simple("unknown error from oms-server, please see oms-server's log to find the problem"); - }finally { - lock.unlock(); + log.warn("[InstanceLogService] fetchInstanceLog failed for instance(instanceId={}).", instanceId, e); + return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); } } + /** + * 异步准备日志文件 + * @param instanceId 任务实例ID + * @return 异步结果 + */ + private Future prepareLogFile(long instanceId) { + return workerPool.submit(() -> { + // 在线日志还在不断更新,需要使用本地数据库中的数据 + if (instanceId2LastReportTime.containsKey(instanceId)) { + return genTemporaryLogFile(instanceId); + } + return genStableLogFile(instanceId); + }); + } + /** * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID @@ -205,52 +162,90 @@ public class InstanceLogService { } Stopwatch sw = Stopwatch.createStarted(); - - if (gridFsTemplate != null) { - - File logFile = new File(genLogFilePath(instanceId)); - - // 先持久化到本地磁盘 - try { - Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); - stream2File(allLogStream, instanceId); - }catch (Exception e) { - log.warn("[InstanceLogService] get log stream failed for instance(instanceId={}).", instanceId, e); - } - - // 推送到 mongoDB - try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(logFile))) { - - InstanceLogMetadata metadata = new InstanceLogMetadata(); - metadata.setInstanceId(instanceId); - metadata.setFileSize(logFile.length()); - metadata.setCreatedTime(System.currentTimeMillis()); - - gridFsTemplate.store(bis, genLogFileName(instanceId), metadata); - - }catch (Exception e) { - log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); - } - } - - // 删除本地数据 try { - long total = CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); + // 先持久化到本地文件 + File stableLogFile = genStableLogFile(instanceId); + // 将文件推送到 MongoDB + if (gridFsTemplate != null) { + try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(stableLogFile))) { - instanceIds.remove(instanceId); - log.info("[InstanceLogService] sync local instanceLogs(instanceId={}) to mongoDB succeed, total logs: {},using: {}.", instanceId, total, sw.stop()); + InstanceLogMetadata metadata = new InstanceLogMetadata(); + metadata.setInstanceId(instanceId); + metadata.setFileSize(stableLogFile.length()); + metadata.setCreatedTime(System.currentTimeMillis()); + gridFsTemplate.store(bis, genMongoFileName(instanceId), metadata); + log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop()); + }catch (Exception e) { + log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); + } + } }catch (Exception e) { - log.warn("[InstanceLogService] delete local instanceLogs failed.", e); + log.warn("[InstanceLogService] sync local instanceLogs(instanceId={}) failed.", instanceId, e); + } + // 删除本地数据库数据 + try { + CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); + instanceId2LastReportTime.remove(instanceId); + }catch (Exception e) { + log.warn("[InstanceLogService] delete local instanceLog(instanceId={}) failed.", instanceId, e); + } + } + + private File genTemporaryLogFile(long instanceId) throws IOException { + String path = genLogFilePath(instanceId, false); + synchronized (("tpFileLock-" + instanceId).intern()) { + File f = new File(path); + // 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回) + if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) { + return f; + } + // 重新构建文件 + try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { + stream2File(allLogStream, f); + } + return f; + } + } + + private File genStableLogFile(long instanceId) throws IOException { + String path = genLogFilePath(instanceId, true); + synchronized (("stFileLock-" + instanceId).intern()) { + File f = new File(path); + if (f.exists()) { + return f; + } + + // 本地存在数据,从本地持久化(对应 SYNC 的情况) + if (instanceId2LastReportTime.containsKey(instanceId)) { + try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { + stream2File(allLogStream, f); + } + }else { + + if (gridFsTemplate == null) { + FileCopyUtils.copy("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.".getBytes(), f); + return f; + } + + // 否则从 mongoDB 拉取数据(对应后期查询的情况) + GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId)); + + if (!gridFsResource.exists()) { + FileCopyUtils.copy("SYSTEM: There is no online log for this job instance.".getBytes(), f); + return f; + } + gridFs2File(gridFsResource, f); + } + return f; } } /** * 将数据库中存储的日志流转化为磁盘日志文件 * @param stream 流 - * @param instanceId 任务实例ID + * @param logFile 目标日志文件 */ - private void stream2File(Stream stream, long instanceId) { - File logFile = new File(genLogFilePath(instanceId)); + private void stream2File(Stream stream, File logFile) throws IOException { if (!logFile.getParentFile().exists()) { if (!logFile.getParentFile().mkdirs()) { log.warn("[InstanceLogService] create dir for instanceLog failed, path is {}.", logFile.getPath()); @@ -264,10 +259,6 @@ public class InstanceLogService { }catch (Exception ignore) { } }); - }catch (Exception e) { - log.warn("[InstanceLogService] write instanceLog(instanceId={}) to local file failed.", instanceId, e); - }finally { - stream.close(); } } @@ -276,7 +267,7 @@ public class InstanceLogService { * @param gridFsResource mongoDB 文件资源 * @param logFile 本地文件资源 */ - private void gridFs2File(GridFsResource gridFsResource, File logFile) { + private void gridFs2File(GridFsResource gridFsResource, File logFile) throws IOException { byte[] buffer = new byte[1024]; try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile)) @@ -285,21 +276,17 @@ public class InstanceLogService { bos.write(buffer); } bos.flush(); - }catch (Exception e) { - log.warn("[InstanceLogService] download instanceLog to local file({}) failed.", logFile.getName(), e); } } - /** * 拼接日志 -> 2020-04-29 22:07:10.059 192.168.1.1:2777 INFO XXX * @param instanceLog 日志对象 * @return 字符串 */ private static String convertLog(LocalInstanceLogDO instanceLog) { - String pattern = "%s [%s] -%s"; - return String.format(pattern, dateFormat.format(instanceLog.getLogTime()), instanceLog.getLogContent()); + return String.format("%s [%s] -%s", dateFormat.format(instanceLog.getLogTime()), instanceLog.getWorkerAddress(), instanceLog.getLogContent()); } @@ -309,7 +296,7 @@ public class InstanceLogService { // 1. 定时删除秒级任务的日志 List frequentInstanceIds = Lists.newLinkedList(); - instanceIds.forEach(instanceId -> { + instanceId2LastReportTime.keySet().forEach(instanceId -> { JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId); if (jobInfo == null) { return; @@ -331,13 +318,20 @@ public class InstanceLogService { } }); } + + // 2. 删除长时间未 REPORT 的日志 } - private static String genLogFileName(long instanceId) { - return String.format("%d.log", instanceId); + + private static String genLogFilePath(long instanceId, boolean stable) { + if (stable) { + return USER_HOME + "/oms/online_log/" + String.format("%d-stable.log", instanceId); + }else { + return USER_HOME + "/oms/online_log/" + String.format("%d-temporary.log", instanceId); + } } - private static String genLogFilePath(long instanceId) { - return USER_HOME + "/oms/online_log/" + genLogFileName(instanceId); + private static String genMongoFileName(long instanceId) { + return String.format("oms-%d.log", instanceId); } @Autowired(required = false)