From 6f28032a0bfce1a5245e2af0a67771c3a13f3dde Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 29 Apr 2020 18:21:11 +0800 Subject: [PATCH] develop the log controller --- oh-my-scheduler-client/pom.xml | 4 +- oh-my-scheduler-common/pom.xml | 2 +- oh-my-scheduler-server/pom.xml | 4 +- .../local/LocalInstanceLogRepository.java | 1 + .../server/service/InstanceLogService.java | 59 +++++++++++++++++-- .../web/controller/InstanceController.java | 46 +++++++++++++++ oh-my-scheduler-worker-samples/pom.xml | 2 +- oh-my-scheduler-worker/pom.xml | 4 +- 8 files changed, 110 insertions(+), 12 deletions(-) diff --git a/oh-my-scheduler-client/pom.xml b/oh-my-scheduler-client/pom.xml index 6ab10fb3..e1fe5534 100644 --- a/oh-my-scheduler-client/pom.xml +++ b/oh-my-scheduler-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 oh-my-scheduler-client - 1.0.0 + 1.0.1 jar - 1.0.0 + 1.0.1 5.6.1 diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml index 02ac3024..860d4edb 100644 --- a/oh-my-scheduler-common/pom.xml +++ b/oh-my-scheduler-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 oh-my-scheduler-common - 1.0.0 + 1.0.1 jar diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml index 4628795a..5835bc86 100644 --- a/oh-my-scheduler-server/pom.xml +++ b/oh-my-scheduler-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 oh-my-scheduler-server - 1.0.0 + 1.0.1 jar 2.9.2 2.2.6.RELEASE - 1.0.0 + 1.0.1 8.0.19 1.4.200 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java index 0b189247..e2821eb2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java @@ -21,4 +21,5 @@ public interface LocalInstanceLogRepository extends JpaRepository instanceIds, Long t); + long countByInstanceId(Long instanceId); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index 7d07a3a1..661ec6d9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -51,9 +51,13 @@ public class InstanceLogService { private final Set instanceIds = Sets.newConcurrentHashSet(); private static final String SPACE = " "; + private static final String LINE_SEPARATOR = "\n"; private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; private static final int BATCH_SIZE = 1000; + private static final int LOG_AVG_SIZE = 100; + + private static final FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); /** * 提交日志记录,持久化到本地数据库中 @@ -78,6 +82,45 @@ public class InstanceLogService { } } + /** + * 获取任务实例运行日志(默认存在本地数据,需要由生成完成请求的路由与转发) + * @param instanceId 任务实例ID + * @return 文本字符串 + */ + public String fetchInstanceLog(Long instanceId) { + + try { + + long logCount = localInstanceLogRepository.countByInstanceId(instanceId); + + // 本地存在数据,直接返回 + if (logCount != 0) { + + Stream logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); + int strSize = (int) Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * logCount); + StringBuilder sb = new StringBuilder(strSize); + logStream.forEach(instanceLogDO -> sb.append(convertLog(instanceLogDO)).append(LINE_SEPARATOR)); + return sb.toString(); + } + + // 从 MongoDB 获取 + InstanceLogDO mongoLog = mongoTemplate.findOne(Query.query(Criteria.where("instanceId").is(instanceId)), InstanceLogDO.class); + if (mongoLog == null) { + return "There is no online log for this task instance"; + } + StringBuilder sb = new StringBuilder(Math.min(Integer.MAX_VALUE, LOG_AVG_SIZE * mongoLog.getLogList().size())); + mongoLog.getLogList().forEach(s -> sb.append(s).append(LINE_SEPARATOR)); + return sb.toString(); + + }catch (Exception e) { + log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e); + return "unknown error from oms-server"; + }catch (OutOfMemoryError oe) { + log.error("[InstanceLogService] The log for instance(instanceId={}) is too large.", instanceId, oe); + return "The log is too large to display directly."; + } + } + /** * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID @@ -92,7 +135,7 @@ public class InstanceLogService { } Stopwatch sw = Stopwatch.createStarted(); - FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); + // 流式操作避免 OOM,至少要扛住 1000W 条日志记录的写入(需要测试时监控内存变化) Stream allLogs = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); @@ -105,9 +148,7 @@ public class InstanceLogService { allLogs.forEach(instanceLog -> { counter.incrementAndGet(); - // 拼接日志 -> 2019-4-21 00:00:00.000 192.168.1.1:2777 INFO XXX - String logStr = dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent(); - instanceLogs.add(logStr); + instanceLogs.add(convertLog(instanceLog)); if (instanceLogs.size() > BATCH_SIZE) { saveToMongoDB(instanceId, instanceLogs, initialized); @@ -129,6 +170,15 @@ public class InstanceLogService { } } + /** + * 拼接日志 -> 2019-4-21 00:00:00.000 192.168.1.1:2777 INFO XXX + * @param instanceLog 日志对象 + * @return 字符串 + */ + private static String convertLog(LocalInstanceLogDO instanceLog) { + return dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent(); + } + private void saveToMongoDB(Long instanceId, List logList, AtomicBoolean initialized) { try { @@ -181,4 +231,5 @@ public class InstanceLogService { }); } } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index a85a4bcd..1cbdec75 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -3,22 +3,30 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.model.InstanceDetail; +import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.PageResult; +import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.oms.server.service.CacheService; +import com.github.kfcfans.oms.server.service.InstanceLogService; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest; import com.github.kfcfans.oms.server.web.response.InstanceLogVO; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import javax.servlet.http.HttpServletResponse; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; @@ -32,11 +40,19 @@ import java.util.stream.Collectors; @RequestMapping("/instance") public class InstanceController { + @Value("${server.port}") + private int port; + @Resource private InstanceService instanceService; + @Resource + private InstanceLogService instanceLogService; + @Resource private CacheService cacheService; @Resource + private AppInfoRepository appInfoRepository; + @Resource private InstanceInfoRepository instanceInfoRepository; private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; @@ -52,6 +68,36 @@ public class InstanceController { return ResultDTO.success(instanceService.getInstanceDetail(Long.valueOf(instanceId))); } + @GetMapping("/log") + public ResultDTO getInstanceLog(Long instanceId, HttpServletResponse response) { + + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); + if (instanceInfo == null) { + return ResultDTO.failed("invalid instanceId: " + instanceId); + } + + Optional appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId()); + if (!appInfoOpt.isPresent()) { + return ResultDTO.failed("impossible"); + } + + String targetServer = appInfoOpt.get().getCurrentServer(); + + // 转发HTTP请求 + if (!OhMyServer.getActorSystemAddress().equals(targetServer)) { + String ip = targetServer.split(":")[0]; + String url = "http://" + ip + ":" + port + "/instance/log?instanceId=" + instanceId; + try { + response.sendRedirect(url); + return ResultDTO.success("redirecting..."); + }catch (Exception e) { + return ResultDTO.failed(e); + } + } + + return ResultDTO.success(instanceLogService.fetchInstanceLog(instanceId)); + } + @PostMapping("/list") public ResultDTO> list(@RequestBody QueryInstanceRequest request) { diff --git a/oh-my-scheduler-worker-samples/pom.xml b/oh-my-scheduler-worker-samples/pom.xml index 94cf9530..dc985e6a 100644 --- a/oh-my-scheduler-worker-samples/pom.xml +++ b/oh-my-scheduler-worker-samples/pom.xml @@ -14,7 +14,7 @@ 2.2.6.RELEASE - 1.0.0 + 1.0.1 1.2.68 diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 031ba74c..08e42707 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 oh-my-scheduler-worker - 1.0.0 + 1.0.1 jar 5.2.4.RELEASE - 1.0.0 + 1.0.1 1.4.200 3.4.2 5.6.1