From 624eba41ef4522939b0fc63690bf7d1eeff0b81a Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 28 Apr 2020 13:03:52 +0800 Subject: [PATCH] sync local logs to mongoDO --- .../common/model/InstanceLogContent.java | 4 +- .../common/request/WorkerLogReportReq.java | 1 + .../oms/server/akka/actors/ServerActor.java | 11 +- .../persistence/local/LocalInstanceLogDO.java | 9 +- .../local/LocalInstanceLogRepository.java | 8 ++ .../persistence/mongodb/InstanceLogDO.java | 10 +- .../mongodb/InstanceLogRepository.java | 12 -- .../server/service/InstanceLogService.java | 134 ++++++++++++++++++ .../src/main/resources/application.properties | 1 - .../oms/worker/background/OmsLogHandler.java | 4 +- 10 files changed, 172 insertions(+), 22 deletions(-) delete mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogRepository.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java index 9a2237ca..81bc07d0 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java @@ -19,8 +19,8 @@ public class InstanceLogContent implements OmsSerializable { // 实例ID private long instanceId; // 日志提交时间 - private long timestamp; + private long logTime; // 日志内容 - private String content; + private String logContent; } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java index c60a149f..e0a9e4ba 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java @@ -18,5 +18,6 @@ import java.util.List; @NoArgsConstructor @AllArgsConstructor public class WorkerLogReportReq implements OmsSerializable { + private String workerAddress; private List instanceLogContents; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java index d4eec769..4d500bfa 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java @@ -4,8 +4,10 @@ import akka.actor.AbstractActor; import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.WorkerHeartbeat; +import com.github.kfcfans.common.request.WorkerLogReportReq; import com.github.kfcfans.common.response.AskResponse; -import com.github.kfcfans.oms.server.akka.requests.Ping; +import com.github.kfcfans.oms.server.common.utils.SpringUtils; +import com.github.kfcfans.oms.server.service.InstanceLogService; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import lombok.extern.slf4j.Slf4j; @@ -24,12 +26,12 @@ public class ServerActor extends AbstractActor { return receiveBuilder() .match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat) .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) + .match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .build(); } - /** * 处理 Worker 的心跳请求 * @param heartbeat 心跳包 @@ -54,4 +56,9 @@ public class ServerActor extends AbstractActor { log.error("[ServerActor] update instance status failed for request: {}.", req, e); } } + + private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) { + // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... + SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java index 0beae055..3760959e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java @@ -27,9 +27,14 @@ public class LocalInstanceLogDO { /** * 日志时间 */ - private Long timestamp; + private Long logTime; /** * 日志内容 */ - private String content; + private String logContent; + + /** + * 机器地址 + */ + private String workerAddress; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java index df872902..bf99d3f7 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.persistence.local; import org.springframework.data.jpa.repository.JpaRepository; +import java.util.stream.Stream; /** * 本地运行时日志数据操作层 @@ -9,4 +10,11 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/4/27 */ public interface LocalInstanceLogRepository extends JpaRepository { + + // 流式查询 + Stream findByInstanceIdOrderByLogTime(Long instanceId); + + // 删除数据 + long deleteByInstanceId(Long instanceId); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java index 98cf6933..79afc054 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java @@ -1,6 +1,10 @@ package com.github.kfcfans.oms.server.persistence.mongodb; +import lombok.Data; +import org.springframework.data.mongodb.core.mapping.Document; + import javax.persistence.Id; +import java.util.List; /** * 任务实例的运行时日志 @@ -8,10 +12,14 @@ import javax.persistence.Id; * @author tjq * @since 2020/4/27 */ +@Data +@Document(collection = "instance_log") public class InstanceLogDO { @Id private String id; - private String log; + private Long instanceId; + + private List logList; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogRepository.java deleted file mode 100644 index 6e2c4a00..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogRepository.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.github.kfcfans.oms.server.persistence.mongodb; - -import org.springframework.data.mongodb.repository.MongoRepository; - -/** - * 任务实例的运行时日志 MongoDB数据操作 - * - * @author tjq - * @since 2020/4/27 - */ -public interface InstanceLogRepository extends MongoRepository { -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java new file mode 100644 index 00000000..a6756a7f --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -0,0 +1,134 @@ +package com.github.kfcfans.oms.server.service; + +import com.github.kfcfans.common.model.InstanceLogContent; +import com.github.kfcfans.common.utils.CommonUtils; +import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO; +import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository; +import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.FastDateFormat; +import org.springframework.beans.BeanUtils; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * 任务实例运行时日志服务 + * + * @author tjq + * @since 2020/4/27 + */ +@Slf4j +@Service +public class InstanceLogService { + + @Resource + private MongoTemplate mongoTemplate; + @Resource + private LocalInstanceLogRepository localInstanceLogRepository; + + private static final String SPACE = " "; + private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; + + private static final int BATCH_SIZE = 1000; + + /** + * 提交日志记录,持久化到本地数据库中 + * @param workerAddress 上报机器地址 + * @param logs 任务实例运行时日志 + */ + public void submitLogs(String workerAddress, List logs) { + + List logList = logs.stream().map(x -> { + LocalInstanceLogDO y = new LocalInstanceLogDO(); + BeanUtils.copyProperties(x, y); + y.setWorkerAddress(workerAddress); + return y; + }).collect(Collectors.toList()); + + try { + CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList)); + }catch (Exception e) { + log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e); + } + } + + /** + * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 + * @param instanceId 任务实例ID + */ + @Async + public void sync(Long instanceId) { + + Stopwatch sw = Stopwatch.createStarted(); + FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); + + // 流式操作避免 OOM,至少要扛住 1000W 条日志记录的写入(需要测试时监控内存变化) + Stream allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); + + List instanceLogs = Lists.newLinkedList(); + AtomicLong counter = new AtomicLong(0); + AtomicBoolean initialized = new AtomicBoolean(false); + + // 将整库数据写入 MongoDB + allLogs.forEach(instanceLog -> { + counter.incrementAndGet(); + + // 拼接日志 -> 2019-4-21 00:00:00.000 192.168.1.1:2777 INFO XXX + String logStr = dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent(); + instanceLogs.add(logStr); + + if (instanceLogs.size() > BATCH_SIZE) { + saveToMongoDB(instanceId, instanceLogs, initialized); + } + }); + + if (!instanceLogs.isEmpty()) { + saveToMongoDB(instanceId, instanceLogs, initialized); + } + + // 删除本地数据 + try { + CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); + }catch (Exception e) { + log.warn("[InstanceLogService] delete local instanceLogs failed.", e); + } + + log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop()); + } + + private void saveToMongoDB(Long instanceId, List logList, AtomicBoolean initialized) { + + try { + CommonUtils.executeWithRetry0(() -> { + if (initialized.get()) { + Query mongoQuery = Query.query(Criteria.where("instanceId").is(instanceId)); + Update mongoUpdate = new Update().push("logList").each(logList); + mongoTemplate.updateFirst(mongoQuery, mongoUpdate, InstanceLogDO.class); + }else { + InstanceLogDO newInstanceLog = new InstanceLogDO(); + newInstanceLog.setInstanceId(instanceId); + newInstanceLog.setLogList(logList); + mongoTemplate.save(newInstanceLog); + initialized.set(true); + } + logList.clear(); + return null; + }); + }catch (Exception e) { + log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e); + } + } +} diff --git a/oh-my-scheduler-server/src/main/resources/application.properties b/oh-my-scheduler-server/src/main/resources/application.properties index 114b8aaf..911dd277 100644 --- a/oh-my-scheduler-server/src/main/resources/application.properties +++ b/oh-my-scheduler-server/src/main/resources/application.properties @@ -3,7 +3,6 @@ server.port=7700 ####### database config ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -# JDBC配置不支持utf8mb4,需要更改my.conf spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/oms?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java index df39b561..61bdc5aa 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java @@ -76,7 +76,7 @@ public class OmsLogHandler { logs.add(logContent); if (logs.size() >= BATCH_SIZE) { - WorkerLogReportReq req = new WorkerLogReportReq(logs); + WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); // 不可靠请求,WEB日志不追求极致 serverActor.tell(req, null); logs.clear(); @@ -88,7 +88,7 @@ public class OmsLogHandler { } if (!logs.isEmpty()) { - WorkerLogReportReq req = new WorkerLogReportReq(logs); + WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); serverActor.tell(req, null); } }