sync local logs to mongoDO

This commit is contained in:
tjq 2020-04-28 13:03:52 +08:00
parent af28e467ea
commit 624eba41ef
10 changed files with 172 additions and 22 deletions

View File

@ -19,8 +19,8 @@ public class InstanceLogContent implements OmsSerializable {
// 实例ID
private long instanceId;
// 日志提交时间
private long timestamp;
private long logTime;
// 日志内容
private String content;
private String logContent;
}

View File

@ -18,5 +18,6 @@ import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
public class WorkerLogReportReq implements OmsSerializable {
private String workerAddress;
private List<InstanceLogContent> instanceLogContents;
}

View File

@ -4,8 +4,10 @@ import akka.actor.AbstractActor;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.request.WorkerLogReportReq;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.akka.requests.Ping;
import com.github.kfcfans.oms.server.common.utils.SpringUtils;
import com.github.kfcfans.oms.server.service.InstanceLogService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import lombok.extern.slf4j.Slf4j;
@ -24,12 +26,12 @@ public class ServerActor extends AbstractActor {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
/**
* 处理 Worker 的心跳请求
* @param heartbeat 心跳包
@ -54,4 +56,9 @@ public class ServerActor extends AbstractActor {
log.error("[ServerActor] update instance status failed for request: {}.", req, e);
}
}
private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get ...
SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
}

View File

@ -27,9 +27,14 @@ public class LocalInstanceLogDO {
/**
* 日志时间
*/
private Long timestamp;
private Long logTime;
/**
* 日志内容
*/
private String content;
private String logContent;
/**
* 机器地址
*/
private String workerAddress;
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.persistence.local;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.stream.Stream;
/**
* 本地运行时日志数据操作层
@ -9,4 +10,11 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/27
*/
public interface LocalInstanceLogRepository extends JpaRepository<LocalInstanceLogDO, Long> {
// 流式查询
Stream<LocalInstanceLogDO> findByInstanceIdOrderByLogTime(Long instanceId);
// 删除数据
long deleteByInstanceId(Long instanceId);
}

View File

@ -1,6 +1,10 @@
package com.github.kfcfans.oms.server.persistence.mongodb;
import lombok.Data;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.persistence.Id;
import java.util.List;
/**
* 任务实例的运行时日志
@ -8,10 +12,14 @@ import javax.persistence.Id;
* @author tjq
* @since 2020/4/27
*/
@Data
@Document(collection = "instance_log")
public class InstanceLogDO {
@Id
private String id;
private String log;
private Long instanceId;
private List<String> logList;
}

View File

@ -1,12 +0,0 @@
package com.github.kfcfans.oms.server.persistence.mongodb;
import org.springframework.data.mongodb.repository.MongoRepository;
/**
* 任务实例的运行时日志 MongoDB数据操作
*
* @author tjq
* @since 2020/4/27
*/
public interface InstanceLogRepository extends MongoRepository<InstanceLogDO, String> {
}

View File

@ -0,0 +1,134 @@
package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.common.model.InstanceLogContent;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO;
import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 任务实例运行时日志服务
*
* @author tjq
* @since 2020/4/27
*/
@Slf4j
@Service
public class InstanceLogService {
@Resource
private MongoTemplate mongoTemplate;
@Resource
private LocalInstanceLogRepository localInstanceLogRepository;
private static final String SPACE = " ";
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
private static final int BATCH_SIZE = 1000;
/**
* 提交日志记录持久化到本地数据库中
* @param workerAddress 上报机器地址
* @param logs 任务实例运行时日志
*/
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
LocalInstanceLogDO y = new LocalInstanceLogDO();
BeanUtils.copyProperties(x, y);
y.setWorkerAddress(workerAddress);
return y;
}).collect(Collectors.toList());
try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
}catch (Exception e) {
log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
}
}
/**
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID
*/
@Async
public void sync(Long instanceId) {
Stopwatch sw = Stopwatch.createStarted();
FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN);
// 流式操作避免 OOM至少要扛住 1000W 条日志记录的写入需要测试时监控内存变化
Stream<LocalInstanceLogDO> allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId);
List<String> instanceLogs = Lists.newLinkedList();
AtomicLong counter = new AtomicLong(0);
AtomicBoolean initialized = new AtomicBoolean(false);
// 将整库数据写入 MongoDB
allLogs.forEach(instanceLog -> {
counter.incrementAndGet();
// 拼接日志 -> 2019-4-21 00:00:00.000 192.168.1.1:2777 INFO XXX
String logStr = dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent();
instanceLogs.add(logStr);
if (instanceLogs.size() > BATCH_SIZE) {
saveToMongoDB(instanceId, instanceLogs, initialized);
}
});
if (!instanceLogs.isEmpty()) {
saveToMongoDB(instanceId, instanceLogs, initialized);
}
// 删除本地数据
try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
}catch (Exception e) {
log.warn("[InstanceLogService] delete local instanceLogs failed.", e);
}
log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop());
}
private void saveToMongoDB(Long instanceId, List<String> logList, AtomicBoolean initialized) {
try {
CommonUtils.executeWithRetry0(() -> {
if (initialized.get()) {
Query mongoQuery = Query.query(Criteria.where("instanceId").is(instanceId));
Update mongoUpdate = new Update().push("logList").each(logList);
mongoTemplate.updateFirst(mongoQuery, mongoUpdate, InstanceLogDO.class);
}else {
InstanceLogDO newInstanceLog = new InstanceLogDO();
newInstanceLog.setInstanceId(instanceId);
newInstanceLog.setLogList(logList);
mongoTemplate.save(newInstanceLog);
initialized.set(true);
}
logList.clear();
return null;
});
}catch (Exception e) {
log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e);
}
}
}

View File

@ -3,7 +3,6 @@ server.port=7700
####### database config #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
# JDBC配置不支持utf8mb4需要更改my.conf
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/oms?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!

View File

@ -76,7 +76,7 @@ public class OmsLogHandler {
logs.add(logContent);
if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(logs);
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs);
// 不可靠请求WEB日志不追求极致
serverActor.tell(req, null);
logs.clear();
@ -88,7 +88,7 @@ public class OmsLogHandler {
}
if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(logs);
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs);
serverActor.tell(req, null);
}
}