finished online log

This commit is contained in:
tjq 2020-04-29 22:11:39 +08:00
parent 184dbff952
commit 1843dc8a60
8 changed files with 45 additions and 25 deletions

View File

@ -48,7 +48,7 @@ public class LocalJpaConfig {
HibernateProperties hibernateProperties = new HibernateProperties(); HibernateProperties hibernateProperties = new HibernateProperties();
// 考虑要不要用 create 模式每次启动都删除数据 // 考虑要不要用 create 模式每次启动都删除数据
hibernateProperties.setDdlAuto("update"); hibernateProperties.setDdlAuto("create");
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings()); return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
} }

View File

@ -31,6 +31,8 @@ public class LocalInstanceLogDO {
/** /**
* 日志内容 * 日志内容
*/ */
@Lob
@Column(columnDefinition="TEXT")
private String logContent; private String logContent;
/** /**

View File

@ -1,8 +1,9 @@
package com.github.kfcfans.oms.server.persistence.local; package com.github.kfcfans.oms.server.persistence.local;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.transaction.annotation.Transactional; import org.springframework.data.jpa.repository.Modifying;
import javax.transaction.Transactional;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -18,6 +19,8 @@ public interface LocalInstanceLogRepository extends JpaRepository<LocalInstanceL
Stream<LocalInstanceLogDO> findByInstanceIdOrderByLogTime(Long instanceId); Stream<LocalInstanceLogDO> findByInstanceIdOrderByLogTime(Long instanceId);
// 删除数据 // 删除数据
@Modifying
@Transactional
long deleteByInstanceId(Long instanceId); long deleteByInstanceId(Long instanceId);
long deleteByInstanceIdInAndLogTimeLessThan(List<Long> instanceIds, Long t); long deleteByInstanceIdInAndLogTimeLessThan(List<Long> instanceIds, Long t);

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -108,7 +107,7 @@ public class InstanceLogService {
// MongoDB 获取 // MongoDB 获取
InstanceLogDO mongoLog = mongoTemplate.findOne(Query.query(Criteria.where("instanceId").is(instanceId)), InstanceLogDO.class); InstanceLogDO mongoLog = mongoTemplate.findOne(Query.query(Criteria.where("instanceId").is(instanceId)), InstanceLogDO.class);
if (mongoLog == null) { if (mongoLog == null) {
return "There is no online log for this task instance"; return "There is no online log for this task instance now.";
} }
StringBuilder sb = new StringBuilder(Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * mongoLog.getLogList().size())); StringBuilder sb = new StringBuilder(Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * mongoLog.getLogList().size()));
mongoLog.getLogList().forEach(s -> sb.append(s).append(LINE_SEPARATOR)); mongoLog.getLogList().forEach(s -> sb.append(s).append(LINE_SEPARATOR));
@ -127,8 +126,8 @@ public class InstanceLogService {
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行 * 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
*/ */
@Transactional
@Async("commonTaskExecutor") @Async("commonTaskExecutor")
@Transactional(readOnly = true)
public void sync(Long instanceId) { public void sync(Long instanceId) {
// 休眠10秒等待全部数据上报OmsLogHandler 每隔5秒上报数据 // 休眠10秒等待全部数据上报OmsLogHandler 每隔5秒上报数据
@ -139,42 +138,43 @@ public class InstanceLogService {
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
// 推送数据到 mongoDB
List<String> instanceLogs = Lists.newLinkedList();
// 流式操作避免 OOM至少要扛住 1000W 条日志记录的写入需要测试时监控内存变化 // 流式操作避免 OOM至少要扛住 1000W 条日志记录的写入需要测试时监控内存变化
Stream<LocalInstanceLogDO> allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); try (Stream<LocalInstanceLogDO> allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
AtomicBoolean initialized = new AtomicBoolean(false);
List<String> instanceLogs = Lists.newLinkedList(); // 将整库数据写入 MongoDB
AtomicLong counter = new AtomicLong(0); allLogs.forEach(instanceLog -> {
AtomicBoolean initialized = new AtomicBoolean(false); instanceLogs.add(convertLog(instanceLog));
if (instanceLogs.size() > BATCH_SIZE) {
saveToMongoDB(instanceId, instanceLogs, initialized);
}
});
// 将整库数据写入 MongoDB if (!instanceLogs.isEmpty()) {
allLogs.forEach(instanceLog -> {
counter.incrementAndGet();
instanceLogs.add(convertLog(instanceLog));
if (instanceLogs.size() > BATCH_SIZE) {
saveToMongoDB(instanceId, instanceLogs, initialized); saveToMongoDB(instanceId, instanceLogs, initialized);
} }
}); }catch (Exception e) {
log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e);
if (!instanceLogs.isEmpty()) {
saveToMongoDB(instanceId, instanceLogs, initialized);
} }
// 删除本地数据 // 删除本地数据
try { try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); long total = CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
instanceIds.remove(instanceId); instanceIds.remove(instanceId);
log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop()); log.info("[InstanceLogService] sync local instanceLogs(instanceId={}) to mongoDB succeed, total logs: {},using: {}.", instanceId, total, sw.stop());
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] delete local instanceLogs failed.", e); log.warn("[InstanceLogService] delete local instanceLogs failed.", e);
} }
} }
/** /**
* 拼接日志 -> 2019-4-21 00:00:00.000 192.168.1.1:2777 INFO XXX * 拼接日志 -> 2020-04-29 22:07:10.059 192.168.1.1:2777 INFO XXX
* @param instanceLog 日志对象 * @param instanceLog 日志对象
* @return 字符串 * @return 字符串
*/ */

View File

@ -146,6 +146,8 @@ public class InstanceManager {
*/ */
public static void processFinishedInstance(Long instanceId) { public static void processFinishedInstance(Long instanceId) {
log.info("[InstanceManager] instance(id={}) process finished.", instanceId);
// 清除已完成的实例信息 // 清除已完成的实例信息
instanceId2StatusHolder.remove(instanceId); instanceId2StatusHolder.remove(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO // 这一步也可能导致后面取不到 JobInfoDO

View File

@ -7,6 +7,8 @@ import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collections;
/** /**
* 单机处理器 示例 * 单机处理器 示例
* com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo * com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo
@ -28,6 +30,15 @@ public class StandaloneProcessorDemo implements BasicProcessor {
System.out.println("TaskContext: " + JSONObject.toJSONString(context)); System.out.println("TaskContext: " + JSONObject.toJSONString(context));
System.out.println("ProcessSuccess: " + success); System.out.println("ProcessSuccess: " + success);
context.getOmsLogger().info("StandaloneProcessorDemo finished process,success: .", success); context.getOmsLogger().info("StandaloneProcessorDemo finished process,success: .", success);
// 测试异常日志
try {
Collections.emptyList().add("277");
}catch (Exception e) {
context.getOmsLogger().error("[StandaloneProcessorDemo] process failed.", e);
}
return new ProcessResult(success, context + ": " + success); return new ProcessResult(success, context + ": " + success);
} }
} }

View File

@ -1 +1,3 @@
server.port=8081 server.port=8081
spring.jpa.open-in-view=false

View File

@ -52,7 +52,7 @@ public class OmsServerLogger implements OmsLogger {
private static String genLog(String level, String messagePattern, Object... arg) { private static String genLog(String level, String messagePattern, Object... arg) {
String pattern = LOG_PREFIX + messagePattern; String pattern = LOG_PREFIX + messagePattern;
Object[] newArgs = new Object[arg.length + 2]; Object[] newArgs = new Object[arg.length + 1];
newArgs[0] = level; newArgs[0] = level;
System.arraycopy(arg, 0, newArgs, 1, arg.length); System.arraycopy(arg, 0, newArgs, 1, arg.length);