From 1843dc8a602788bd76f2b8b5fd8a21fbdfc53921 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 29 Apr 2020 22:11:39 +0800 Subject: [PATCH] finished online log --- .../persistence/config/LocalJpaConfig.java | 2 +- .../persistence/local/LocalInstanceLogDO.java | 2 + .../local/LocalInstanceLogRepository.java | 5 ++- .../server/service/InstanceLogService.java | 42 +++++++++---------- .../service/instance/InstanceManager.java | 2 + .../processors/StandaloneProcessorDemo.java | 11 +++++ .../src/main/resources/application.properties | 4 +- .../oms/worker/log/impl/OmsServerLogger.java | 2 +- 8 files changed, 45 insertions(+), 25 deletions(-) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java index 6ae474e5..bd930091 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/config/LocalJpaConfig.java @@ -48,7 +48,7 @@ public class LocalJpaConfig { HibernateProperties hibernateProperties = new HibernateProperties(); // 考虑要不要用 create 模式,每次启动都删除数据 - hibernateProperties.setDdlAuto("update"); + hibernateProperties.setDdlAuto("create"); return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings()); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java index 3760959e..2ac16c23 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogDO.java @@ -31,6 +31,8 @@ public class LocalInstanceLogDO { /** * 日志内容 */ + @Lob + @Column(columnDefinition="TEXT") private String logContent; /** diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java index 9c97d60a..6da44241 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java @@ -1,8 +1,9 @@ package com.github.kfcfans.oms.server.persistence.local; import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.data.jpa.repository.Modifying; +import javax.transaction.Transactional; import java.util.List; import java.util.stream.Stream; @@ -18,6 +19,8 @@ public interface LocalInstanceLogRepository extends JpaRepository findByInstanceIdOrderByLogTime(Long instanceId); // 删除数据 + @Modifying + @Transactional long deleteByInstanceId(Long instanceId); long deleteByInstanceIdInAndLogTimeLessThan(List instanceIds, Long t); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index 9254f41c..955e1c0c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -108,7 +107,7 @@ public class InstanceLogService { // 从 MongoDB 获取 InstanceLogDO mongoLog = mongoTemplate.findOne(Query.query(Criteria.where("instanceId").is(instanceId)), InstanceLogDO.class); if (mongoLog == null) { - return "There is no online log for this task instance"; + return "There is no online log for this task instance now."; } StringBuilder sb = new StringBuilder(Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * mongoLog.getLogList().size())); mongoLog.getLogList().forEach(s -> sb.append(s).append(LINE_SEPARATOR)); @@ -127,8 +126,8 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ + @Transactional @Async("commonTaskExecutor") - @Transactional(readOnly = true) public void sync(Long instanceId) { // 休眠10秒等待全部数据上报(OmsLogHandler 每隔5秒上报数据) @@ -139,42 +138,43 @@ public class InstanceLogService { Stopwatch sw = Stopwatch.createStarted(); + // 推送数据到 mongoDB + List instanceLogs = Lists.newLinkedList(); // 流式操作避免 OOM,至少要扛住 1000W 条日志记录的写入(需要测试时监控内存变化) - Stream allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); + try (Stream allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { + AtomicBoolean initialized = new AtomicBoolean(false); - List instanceLogs = Lists.newLinkedList(); - AtomicLong counter = new AtomicLong(0); - AtomicBoolean initialized = new AtomicBoolean(false); + // 将整库数据写入 MongoDB + allLogs.forEach(instanceLog -> { + instanceLogs.add(convertLog(instanceLog)); + if (instanceLogs.size() > BATCH_SIZE) { + saveToMongoDB(instanceId, instanceLogs, initialized); + } + }); - // 将整库数据写入 MongoDB - allLogs.forEach(instanceLog -> { - counter.incrementAndGet(); - - instanceLogs.add(convertLog(instanceLog)); - - if (instanceLogs.size() > BATCH_SIZE) { + if (!instanceLogs.isEmpty()) { saveToMongoDB(instanceId, instanceLogs, initialized); } - }); - - if (!instanceLogs.isEmpty()) { - saveToMongoDB(instanceId, instanceLogs, initialized); + }catch (Exception e) { + log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); } // 删除本地数据 try { - CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); + long total = CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); instanceIds.remove(instanceId); - log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop()); + log.info("[InstanceLogService] sync local instanceLogs(instanceId={}) to mongoDB succeed, total logs: {},using: {}.", instanceId, total, sw.stop()); }catch (Exception e) { log.warn("[InstanceLogService] delete local instanceLogs failed.", e); } } + + /** - * 拼接日志 -> 2019-4-21 00:00:00.000 192.168.1.1:2777 INFO XXX + * 拼接日志 -> 2020-04-29 22:07:10.059 192.168.1.1:2777 INFO XXX * @param instanceLog 日志对象 * @return 字符串 */ diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 9466d091..eb40fb09 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -146,6 +146,8 @@ public class InstanceManager { */ public static void processFinishedInstance(Long instanceId) { + log.info("[InstanceManager] instance(id={}) process finished.", instanceId); + // 清除已完成的实例信息 instanceId2StatusHolder.remove(instanceId); // 这一步也可能导致后面取不到 JobInfoDO diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java index ee205d0a..0385bc52 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java @@ -7,6 +7,8 @@ import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Collections; + /** * 单机处理器 示例 * com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo @@ -28,6 +30,15 @@ public class StandaloneProcessorDemo implements BasicProcessor { System.out.println("TaskContext: " + JSONObject.toJSONString(context)); System.out.println("ProcessSuccess: " + success); context.getOmsLogger().info("StandaloneProcessorDemo finished process,success: .", success); + + // 测试异常日志 + try { + Collections.emptyList().add("277"); + }catch (Exception e) { + context.getOmsLogger().error("[StandaloneProcessorDemo] process failed.", e); + } + + return new ProcessResult(success, context + ": " + success); } } diff --git a/oh-my-scheduler-worker-samples/src/main/resources/application.properties b/oh-my-scheduler-worker-samples/src/main/resources/application.properties index bafddced..8e87e783 100644 --- a/oh-my-scheduler-worker-samples/src/main/resources/application.properties +++ b/oh-my-scheduler-worker-samples/src/main/resources/application.properties @@ -1 +1,3 @@ -server.port=8081 \ No newline at end of file +server.port=8081 + +spring.jpa.open-in-view=false \ No newline at end of file diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java index 532a54a9..a19d59bb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java @@ -52,7 +52,7 @@ public class OmsServerLogger implements OmsLogger { private static String genLog(String level, String messagePattern, Object... arg) { String pattern = LOG_PREFIX + messagePattern; - Object[] newArgs = new Object[arg.length + 2]; + Object[] newArgs = new Object[arg.length + 1]; newArgs[0] = level; System.arraycopy(arg, 0, newArgs, 1, arg.length);