basic omsLogger v1.0.1

This commit is contained in:
tjq 2020-05-03 21:48:12 +08:00
parent 635140177d
commit d74a767733
5 changed files with 192 additions and 97 deletions

View File

@ -0,0 +1,37 @@
package com.github.kfcfans.oms.server.persistence;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 文本分页
*
* @author tjq
* @since 2020/5/3
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StringPage {
/**
* 当前页数
*/
private long index;
/**
* 总页数
*/
private long totalPages;
/**
* 文本数据
*/
private String data;
public static StringPage simple(String data) {
StringPage sp = new StringPage();
sp.index = 0;
sp.totalPages = 1;
sp.data = data;
return sp;
}
}

View File

@ -1,25 +0,0 @@
package com.github.kfcfans.oms.server.persistence.mongodb;
import lombok.Data;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.persistence.Id;
import java.util.List;
/**
* 任务实例的运行时日志
*
* @author YuHuaFans余华的小说确实挺好看的...虽然看完总是要忧郁几天...
* @since 2020/4/27
*/
@Data
@Document(collection = "instance_log")
public class InstanceLogDO {
@Id
private String id;
private Long instanceId;
private List<String> logList;
}

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.oms.server.persistence.mongodb;
import lombok.Data;
/**
* 任务日志元数据
*
* @author tjq
* @since 2020/5/3
*/
@Data
public class InstanceLogMetadata {
/**
* 任务实例ID
*/
private long instanceId;
/**
* 文件大小
*/
private long fileSize;
/**
* 创建时间用于条件删除
*/
private long createdTime;
}

View File

@ -3,34 +3,42 @@ package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.model.InstanceLogContent;
import com.github.kfcfans.oms.common.utils.CommonUtils;
import com.github.kfcfans.oms.server.persistence.StringPage;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO;
import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mongodb.gridfs.GridFS;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.model.GridFSFile;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.gridfs.GridFsCriteria;
import org.springframework.data.mongodb.gridfs.GridFsResource;
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ResourceUtils;
import javax.annotation.Resource;
import java.io.*;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -44,7 +52,7 @@ import java.util.stream.Stream;
@Service
public class InstanceLogService {
private MongoTemplate mongoTemplate;
private GridFsTemplate gridFsTemplate;
@Resource
private LocalInstanceLogRepository localInstanceLogRepository;
@ -52,14 +60,15 @@ public class InstanceLogService {
private final Set<Long> instanceIds = Sets.newConcurrentHashSet();
private static final String SPACE = " ";
private static final String LINE_SEPARATOR = "\n";
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
private static final int BATCH_SIZE = 1000;
private static final int LOG_AVG_SIZE = 100;
private static final FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN);
// 文件路径
private static final String USER_HOME = System.getProperty("user.home", "oms");
// 每一个展示的行数
private static final int MAX_LINE_COUNT = 500;
/**
* 提交日志记录持久化到本地数据库中
* @param workerAddress 上报机器地址
@ -86,44 +95,80 @@ public class InstanceLogService {
/**
* 获取任务实例运行日志默认存在本地数据需要由生成完成请求的路由与转发
* @param instanceId 任务实例ID
* @param index 页码
* @return 文本字符串
*/
@Transactional(readOnly = true)
public String fetchInstanceLog(Long instanceId) {
public StringPage fetchInstanceLog(Long instanceId, long index) {
File logFile = new File(genLogFilePath(instanceId));
try {
long logCount = localInstanceLogRepository.countByInstanceId(instanceId);
// 本地存在数据直接返回
// 构建本地日志文件
if (logCount != 0) {
// 数据库中存在说明数据还在更新中需要重新生成
Stream<LocalInstanceLogDO> logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId);
int strSize = (int) Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * logCount);
StringBuilder sb = new StringBuilder(strSize);
logStream.forEach(instanceLogDO -> sb.append(convertLog(instanceLogDO)).append(LINE_SEPARATOR));
return sb.toString();
stream2File(logStream, instanceId);
}else {
if (gridFsTemplate == null) {
return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the logs.");
}
// 不存在需要重新下载
if (!logFile.exists()) {
GridFsResource gridFsResource = gridFsTemplate.getResource(genLogFileName(instanceId));
if (!gridFsResource.exists()) {
return StringPage.simple("There is no online log for this job instance");
}
byte[] buffer = new byte[1024];
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
) {
while (gis.read(buffer) != -1) {
bos.write(buffer);
}
}
}
}
if (mongoTemplate == null) {
return "There is no local log for this task now, you need to use mongoDB to store the logs.";
if (!logFile.exists()) {
return StringPage.simple("There is no online log for this task instance now.");
}
// MongoDB 获取
InstanceLogDO mongoLog = mongoTemplate.findOne(Query.query(Criteria.where("instanceId").is(instanceId)), InstanceLogDO.class);
if (mongoLog == null) {
return "There is no online log for this task instance now.";
// 分页展示数据
long lines = 0;
StringBuilder sb = new StringBuilder();
String lineStr;
long left = index * MAX_LINE_COUNT;
long right = left + MAX_LINE_COUNT;
try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) {
while ((lineStr = lr.readLine()) != null) {
// 指定范围内读出
if (lines >= left && lines < right) {
sb.append(lineStr).append(System.lineSeparator());
}
++lines;
}
}catch (Exception e) {
log.warn("[InstanceLogService] read logFile from disk failed.", e);
}
StringBuilder sb = new StringBuilder(Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * mongoLog.getLogList().size()));
mongoLog.getLogList().forEach(s -> sb.append(s).append(LINE_SEPARATOR));
return sb.toString();
double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
return new StringPage(index, (long) totalPage, sb.toString());
}catch (Exception e) {
log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e);
return "unknown error from oms-server";
}catch (OutOfMemoryError oe) {
log.error("[InstanceLogService] The log for instance(instanceId={}) is too large.", instanceId, oe);
return "The log is too large to display directly.";
return StringPage.simple("unknown error from oms-server, please see oms-server's log to find the problem");
}
}
@ -143,25 +188,28 @@ public class InstanceLogService {
Stopwatch sw = Stopwatch.createStarted();
// 推送数据到 mongoDB
List<String> instanceLogs = Lists.newLinkedList();
if (gridFsTemplate != null) {
// 流式操作避免 OOM至少要扛住 1000W 条日志记录的写入需要测试时监控内存变化
if (mongoTemplate != null) {
try (Stream<LocalInstanceLogDO> allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
AtomicBoolean initialized = new AtomicBoolean(false);
File logFile = new File(genLogFilePath(instanceId));
// 将整库数据写入 MongoDB
allLogs.forEach(instanceLog -> {
instanceLogs.add(convertLog(instanceLog));
if (instanceLogs.size() > BATCH_SIZE) {
saveToMongoDB(instanceId, instanceLogs, initialized);
}
});
// 先持久化到本地磁盘
try {
Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId);
stream2File(allLogStream, instanceId);
}catch (Exception e) {
log.warn("[InstanceLogService] get log stream failed for instance(instanceId={}).", instanceId, e);
}
// 推送到 mongoDB
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(logFile))) {
InstanceLogMetadata metadata = new InstanceLogMetadata();
metadata.setInstanceId(instanceId);
metadata.setFileSize(logFile.length());
metadata.setCreatedTime(System.currentTimeMillis());
gridFsTemplate.store(bis, genLogFileName(instanceId), metadata);
if (!instanceLogs.isEmpty()) {
saveToMongoDB(instanceId, instanceLogs, initialized);
}
}catch (Exception e) {
log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e);
}
@ -178,6 +226,29 @@ public class InstanceLogService {
}
}
/**
* 将数据库中存储的日志流转化为磁盘日志文件
* @param stream
* @param instanceId 任务实例ID
*/
@SuppressWarnings("all")
private void stream2File(Stream<LocalInstanceLogDO> stream, long instanceId) {
File logFile = new File(genLogFilePath(instanceId));
logFile.getParentFile().mkdirs();
try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) {
stream.forEach(instanceLog -> {
try {
bfw.write(convertLog(instanceLog) + System.lineSeparator());
}catch (Exception ignore) {
}
});
}catch (Exception e) {
log.warn("[InstanceLogService] write instanceLog(instanceId={}) to local file failed.", instanceId, e);
}finally {
stream.close();
}
}
/**
@ -189,28 +260,6 @@ public class InstanceLogService {
return dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent();
}
private void saveToMongoDB(Long instanceId, List<String> logList, AtomicBoolean initialized) {
try {
CommonUtils.executeWithRetry0(() -> {
if (initialized.get()) {
Query mongoQuery = Query.query(Criteria.where("instanceId").is(instanceId));
Update mongoUpdate = new Update().push("logList").each(logList);
mongoTemplate.updateFirst(mongoQuery, mongoUpdate, InstanceLogDO.class);
}else {
InstanceLogDO newInstanceLog = new InstanceLogDO();
newInstanceLog.setInstanceId(instanceId);
newInstanceLog.setLogList(logList);
mongoTemplate.save(newInstanceLog);
initialized.set(true);
}
logList.clear();
return null;
});
}catch (Exception e) {
log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e);
}
}
@Async("omsTimingPool")
@Scheduled(fixedDelay = 60000)
@ -242,9 +291,15 @@ public class InstanceLogService {
}
}
@Autowired(required = false)
public void setMongoTemplate(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
private static String genLogFileName(long instanceId) {
return String.format("%d.log", instanceId);
}
private static String genLogFilePath(long instanceId) {
return USER_HOME + "/oms/online_log/" + genLogFileName(instanceId);
}
@Autowired(required = false)
public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
this.gridFsTemplate = gridFsTemplate;
}
}

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.common.model.InstanceDetail;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.PageResult;
import com.github.kfcfans.oms.server.persistence.StringPage;
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
@ -68,7 +69,7 @@ public class InstanceController {
}
@GetMapping("/log")
public ResultDTO<String> getInstanceLog(Long instanceId, HttpServletResponse response) {
public ResultDTO<StringPage> getInstanceLog(Long instanceId, Long index, HttpServletResponse response) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) {
@ -88,13 +89,13 @@ public class InstanceController {
String url = "http://" + ip + ":" + port + "/instance/log?instanceId=" + instanceId;
try {
response.sendRedirect(url);
return ResultDTO.success("redirecting...");
return ResultDTO.success(StringPage.simple("redirecting..."));
}catch (Exception e) {
return ResultDTO.failed(e);
}
}
return ResultDTO.success(instanceLogService.fetchInstanceLog(instanceId));
return ResultDTO.success(instanceLogService.fetchInstanceLog(instanceId, index));
}
@PostMapping("/list")