diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/StringPage.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/StringPage.java new file mode 100644 index 00000000..c150792a --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/StringPage.java @@ -0,0 +1,37 @@ +package com.github.kfcfans.oms.server.persistence; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 文本分页 + * + * @author tjq + * @since 2020/5/3 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class StringPage { + /** + * 当前页数 + */ + private long index; + /** + * 总页数 + */ + private long totalPages; + /** + * 文本数据 + */ + private String data; + + public static StringPage simple(String data) { + StringPage sp = new StringPage(); + sp.index = 0; + sp.totalPages = 1; + sp.data = data; + return sp; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java deleted file mode 100644 index a213b5d6..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.kfcfans.oms.server.persistence.mongodb; - -import lombok.Data; -import org.springframework.data.mongodb.core.mapping.Document; - -import javax.persistence.Id; -import java.util.List; - -/** - * 任务实例的运行时日志 - * - * @author YuHuaFans(余华的小说确实挺好看的...虽然看完总是要忧郁几天...) - * @since 2020/4/27 - */ -@Data -@Document(collection = "instance_log") -public class InstanceLogDO { - - @Id - private String id; - - private Long instanceId; - - private List logList; -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogMetadata.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogMetadata.java new file mode 100644 index 00000000..5f6383e4 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogMetadata.java @@ -0,0 +1,27 @@ +package com.github.kfcfans.oms.server.persistence.mongodb; + +import lombok.Data; + +/** + * 任务日志元数据 + * + * @author tjq + * @since 2020/5/3 + */ +@Data +public class InstanceLogMetadata { + + /** + * 任务实例ID + */ + private long instanceId; + /** + * 文件大小 + */ + private long fileSize; + /** + * 创建时间(用于条件删除) + */ + private long createdTime; + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index aa75f026..a2ac7236 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -3,34 +3,42 @@ package com.github.kfcfans.oms.server.service; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.model.InstanceLogContent; import com.github.kfcfans.oms.common.utils.CommonUtils; +import com.github.kfcfans.oms.server.persistence.StringPage; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository; -import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.mongodb.gridfs.GridFS; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSDownloadStream; +import com.mongodb.client.gridfs.model.GridFSFile; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.gridfs.GridFsCriteria; +import org.springframework.data.mongodb.gridfs.GridFsResource; +import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; +import org.springframework.util.ResourceUtils; import javax.annotation.Resource; +import java.io.*; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -44,7 +52,7 @@ import java.util.stream.Stream; @Service public class InstanceLogService { - private MongoTemplate mongoTemplate; + private GridFsTemplate gridFsTemplate; @Resource private LocalInstanceLogRepository localInstanceLogRepository; @@ -52,14 +60,15 @@ public class InstanceLogService { private final Set instanceIds = Sets.newConcurrentHashSet(); private static final String SPACE = " "; - private static final String LINE_SEPARATOR = "\n"; private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; - private static final int BATCH_SIZE = 1000; - private static final int LOG_AVG_SIZE = 100; - private static final FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); + // 文件路径 + private static final String USER_HOME = System.getProperty("user.home", "oms"); + // 每一个展示的行数 + private static final int MAX_LINE_COUNT = 500; + /** * 提交日志记录,持久化到本地数据库中 * @param workerAddress 上报机器地址 @@ -86,44 +95,80 @@ public class InstanceLogService { /** * 获取任务实例运行日志(默认存在本地数据,需要由生成完成请求的路由与转发) * @param instanceId 任务实例ID + * @param index 页码 * @return 文本字符串 */ @Transactional(readOnly = true) - public String fetchInstanceLog(Long instanceId) { + public StringPage fetchInstanceLog(Long instanceId, long index) { + + File logFile = new File(genLogFilePath(instanceId)); try { long logCount = localInstanceLogRepository.countByInstanceId(instanceId); - // 本地存在数据,直接返回 + // 构建本地日志文件 if (logCount != 0) { + // 数据库中存在,说明数据还在更新中,需要重新生成 Stream logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); - int strSize = (int) Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * logCount); - StringBuilder sb = new StringBuilder(strSize); - logStream.forEach(instanceLogDO -> sb.append(convertLog(instanceLogDO)).append(LINE_SEPARATOR)); - return sb.toString(); + stream2File(logStream, instanceId); + + }else { + + if (gridFsTemplate == null) { + return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the logs."); + } + + // 不存在,需要重新下载 + if (!logFile.exists()) { + GridFsResource gridFsResource = gridFsTemplate.getResource(genLogFileName(instanceId)); + + if (!gridFsResource.exists()) { + return StringPage.simple("There is no online log for this job instance"); + } + + byte[] buffer = new byte[1024]; + try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile)) + ) { + while (gis.read(buffer) != -1) { + bos.write(buffer); + } + } + } } - if (mongoTemplate == null) { - return "There is no local log for this task now, you need to use mongoDB to store the logs."; + if (!logFile.exists()) { + return StringPage.simple("There is no online log for this task instance now."); } - // 从 MongoDB 获取 - InstanceLogDO mongoLog = mongoTemplate.findOne(Query.query(Criteria.where("instanceId").is(instanceId)), InstanceLogDO.class); - if (mongoLog == null) { - return "There is no online log for this task instance now."; + // 分页展示数据 + long lines = 0; + StringBuilder sb = new StringBuilder(); + String lineStr; + long left = index * MAX_LINE_COUNT; + long right = left + MAX_LINE_COUNT; + try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) { + while ((lineStr = lr.readLine()) != null) { + + // 指定范围内,读出 + if (lines >= left && lines < right) { + sb.append(lineStr).append(System.lineSeparator()); + } + ++lines; + } + }catch (Exception e) { + log.warn("[InstanceLogService] read logFile from disk failed.", e); } - StringBuilder sb = new StringBuilder(Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * mongoLog.getLogList().size())); - mongoLog.getLogList().forEach(s -> sb.append(s).append(LINE_SEPARATOR)); - return sb.toString(); + + double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT); + return new StringPage(index, (long) totalPage, sb.toString()); + }catch (Exception e) { log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e); - return "unknown error from oms-server"; - }catch (OutOfMemoryError oe) { - log.error("[InstanceLogService] The log for instance(instanceId={}) is too large.", instanceId, oe); - return "The log is too large to display directly."; + return StringPage.simple("unknown error from oms-server, please see oms-server's log to find the problem"); } } @@ -143,25 +188,28 @@ public class InstanceLogService { Stopwatch sw = Stopwatch.createStarted(); - // 推送数据到 mongoDB - List instanceLogs = Lists.newLinkedList(); + if (gridFsTemplate != null) { - // 流式操作避免 OOM,至少要扛住 1000W 条日志记录的写入(需要测试时监控内存变化) - if (mongoTemplate != null) { - try (Stream allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { - AtomicBoolean initialized = new AtomicBoolean(false); + File logFile = new File(genLogFilePath(instanceId)); - // 将整库数据写入 MongoDB - allLogs.forEach(instanceLog -> { - instanceLogs.add(convertLog(instanceLog)); - if (instanceLogs.size() > BATCH_SIZE) { - saveToMongoDB(instanceId, instanceLogs, initialized); - } - }); + // 先持久化到本地磁盘 + try { + Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); + stream2File(allLogStream, instanceId); + }catch (Exception e) { + log.warn("[InstanceLogService] get log stream failed for instance(instanceId={}).", instanceId, e); + } + + // 推送到 mongoDB + try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(logFile))) { + + InstanceLogMetadata metadata = new InstanceLogMetadata(); + metadata.setInstanceId(instanceId); + metadata.setFileSize(logFile.length()); + metadata.setCreatedTime(System.currentTimeMillis()); + + gridFsTemplate.store(bis, genLogFileName(instanceId), metadata); - if (!instanceLogs.isEmpty()) { - saveToMongoDB(instanceId, instanceLogs, initialized); - } }catch (Exception e) { log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); } @@ -178,6 +226,29 @@ public class InstanceLogService { } } + /** + * 将数据库中存储的日志流转化为磁盘日志文件 + * @param stream 流 + * @param instanceId 任务实例ID + */ + @SuppressWarnings("all") + private void stream2File(Stream stream, long instanceId) { + File logFile = new File(genLogFilePath(instanceId)); + logFile.getParentFile().mkdirs(); + try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) { + stream.forEach(instanceLog -> { + try { + bfw.write(convertLog(instanceLog) + System.lineSeparator()); + }catch (Exception ignore) { + } + }); + }catch (Exception e) { + log.warn("[InstanceLogService] write instanceLog(instanceId={}) to local file failed.", instanceId, e); + }finally { + stream.close(); + } + } + /** @@ -189,28 +260,6 @@ public class InstanceLogService { return dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent(); } - private void saveToMongoDB(Long instanceId, List logList, AtomicBoolean initialized) { - - try { - CommonUtils.executeWithRetry0(() -> { - if (initialized.get()) { - Query mongoQuery = Query.query(Criteria.where("instanceId").is(instanceId)); - Update mongoUpdate = new Update().push("logList").each(logList); - mongoTemplate.updateFirst(mongoQuery, mongoUpdate, InstanceLogDO.class); - }else { - InstanceLogDO newInstanceLog = new InstanceLogDO(); - newInstanceLog.setInstanceId(instanceId); - newInstanceLog.setLogList(logList); - mongoTemplate.save(newInstanceLog); - initialized.set(true); - } - logList.clear(); - return null; - }); - }catch (Exception e) { - log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e); - } - } @Async("omsTimingPool") @Scheduled(fixedDelay = 60000) @@ -242,9 +291,15 @@ public class InstanceLogService { } } - @Autowired(required = false) - public void setMongoTemplate(MongoTemplate mongoTemplate) { - this.mongoTemplate = mongoTemplate; + private static String genLogFileName(long instanceId) { + return String.format("%d.log", instanceId); + } + private static String genLogFilePath(long instanceId) { + return USER_HOME + "/oms/online_log/" + genLogFileName(instanceId); } + @Autowired(required = false) + public void setGridFsTemplate(GridFsTemplate gridFsTemplate) { + this.gridFsTemplate = gridFsTemplate; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index 79963d6b..2b4d3223 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -5,6 +5,7 @@ import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.model.InstanceDetail; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.PageResult; +import com.github.kfcfans.oms.server.persistence.StringPage; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository; @@ -68,7 +69,7 @@ public class InstanceController { } @GetMapping("/log") - public ResultDTO getInstanceLog(Long instanceId, HttpServletResponse response) { + public ResultDTO getInstanceLog(Long instanceId, Long index, HttpServletResponse response) { InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); if (instanceInfo == null) { @@ -88,13 +89,13 @@ public class InstanceController { String url = "http://" + ip + ":" + port + "/instance/log?instanceId=" + instanceId; try { response.sendRedirect(url); - return ResultDTO.success("redirecting..."); + return ResultDTO.success(StringPage.simple("redirecting...")); }catch (Exception e) { return ResultDTO.failed(e); } } - return ResultDTO.success(instanceLogService.fetchInstanceLog(instanceId)); + return ResultDTO.success(instanceLogService.fetchInstanceLog(instanceId, index)); } @PostMapping("/list")