diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index a2ac7236..dd5075a3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -11,20 +11,12 @@ import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; import com.google.common.collect.Sets; -import com.google.errorprone.annotations.CanIgnoreReturnValue; -import com.mongodb.client.gridfs.GridFSBucket; -import com.mongodb.client.gridfs.GridFSDownloadStream; -import com.mongodb.client.gridfs.model.GridFSFile; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.mongodb.MongoDbFactory; -import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.query.Criteria; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.gridfs.GridFsCriteria; import org.springframework.data.mongodb.gridfs.GridFsResource; import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.scheduling.annotation.Async; @@ -32,13 +24,15 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; -import org.springframework.util.ResourceUtils; -import javax.annotation.Resource; import java.io.*; import java.util.List; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,22 +46,37 @@ import java.util.stream.Stream; @Service public class InstanceLogService { + // 直接操作 mongoDB 文件系统 private GridFsTemplate gridFsTemplate; - @Resource private LocalInstanceLogRepository localInstanceLogRepository; // 本地维护了在线日志的任务实例ID private final Set instanceIds = Sets.newConcurrentHashSet(); + // 锁(可重入锁也太坑了吧,需要考虑同一个线程重复下载的问题 -> 因为下载交给了额外的线程去做...) + private final int lockNum; + private final Lock[] locks; + private final Executor workerPool; - private static final String SPACE = " "; - private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; - - private static final FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); - - // 文件路径 + // 格式化时间戳 + private static final FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS"); + // 用户路径 private static final String USER_HOME = System.getProperty("user.home", "oms"); // 每一个展示的行数 private static final int MAX_LINE_COUNT = 500; + // 过期时间 + private static final long EXPIRE_INTERVAL_MS = 60000; + // 小文件阈值(2M) + private static final int SMALL_FILE_MAX_SIZE = 2 * 1024 * 1024; + + public InstanceLogService() { + lockNum = Runtime.getRuntime().availableProcessors(); + locks = new ReentrantLock[lockNum]; + for (int i = 0; i < lockNum; i++) { + locks[i] = new ReentrantLock(); + } + + workerPool = new ThreadPoolExecutor(lockNum, lockNum, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue()); + } /** * 提交日志记录,持久化到本地数据库中 @@ -102,22 +111,31 @@ public class InstanceLogService { public StringPage fetchInstanceLog(Long instanceId, long index) { File logFile = new File(genLogFilePath(instanceId)); + Lock lock = locks[(int) (instanceId % lockNum)]; + lock.lock(); try { - long logCount = localInstanceLogRepository.countByInstanceId(instanceId); - // 构建本地日志文件 + // 直接从本地数据库构建日志文件 if (logCount != 0) { - // 数据库中存在,说明数据还在更新中,需要重新生成 - Stream logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); - stream2File(logStream, instanceId); + // 存在则判断上次更改时间,1分钟内有效 + if (logFile.exists()) { + long offset = System.currentTimeMillis() - logFile.lastModified(); + // 过期才选择重新构建文件 + if (offset > EXPIRE_INTERVAL_MS) { + Stream logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId); + + // 这里直接用 Controller 线程执行,毕竟本地持久化用不了多少时间,因此不用考虑可重入锁的问题 + stream2File(logStream, instanceId); + } + } }else { if (gridFsTemplate == null) { - return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the logs."); + return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the past logs."); } // 不存在,需要重新下载 @@ -125,16 +143,16 @@ public class InstanceLogService { GridFsResource gridFsResource = gridFsTemplate.getResource(genLogFileName(instanceId)); if (!gridFsResource.exists()) { - return StringPage.simple("There is no online log for this job instance"); + return StringPage.simple("There is no online log for this job instance."); } - byte[] buffer = new byte[1024]; - try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); - BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile)) - ) { - while (gis.read(buffer) != -1) { - bos.write(buffer); - } + long targetFileSize = gridFsResource.contentLength(); + // 小文件 Controller 线程直接执行,大文件则异步下载,先返回 downloading... + if (targetFileSize <= SMALL_FILE_MAX_SIZE) { + gridFs2File(gridFsResource, logFile); + }else { + workerPool.execute(() -> gridFs2File(gridFsResource, logFile)); + return StringPage.simple("downloading from mongoDB, please retry some time later~"); } } } @@ -164,11 +182,11 @@ public class InstanceLogService { double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT); return new StringPage(index, (long) totalPage, sb.toString()); - - }catch (Exception e) { log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e); return StringPage.simple("unknown error from oms-server, please see oms-server's log to find the problem"); + }finally { + lock.unlock(); } } @@ -231,10 +249,14 @@ public class InstanceLogService { * @param stream 流 * @param instanceId 任务实例ID */ - @SuppressWarnings("all") private void stream2File(Stream stream, long instanceId) { File logFile = new File(genLogFilePath(instanceId)); - logFile.getParentFile().mkdirs(); + if (!logFile.getParentFile().exists()) { + if (!logFile.getParentFile().mkdirs()) { + log.warn("[InstanceLogService] create dir for instanceLog failed, path is {}.", logFile.getPath()); + return; + } + } try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) { stream.forEach(instanceLog -> { try { @@ -249,6 +271,25 @@ public class InstanceLogService { } } + /** + * 将MongoDB中存储的日志持久化为磁盘日志 + * @param gridFsResource mongoDB 文件资源 + * @param logFile 本地文件资源 + */ + private void gridFs2File(GridFsResource gridFsResource, File logFile) { + byte[] buffer = new byte[1024]; + try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile)) + ) { + while (gis.read(buffer) != -1) { + bos.write(buffer); + } + bos.flush(); + }catch (Exception e) { + log.warn("[InstanceLogService] download instanceLog to local file({}) failed.", logFile.getName(), e); + } + } + /** @@ -257,7 +298,8 @@ public class InstanceLogService { * @return 字符串 */ private static String convertLog(LocalInstanceLogDO instanceLog) { - return dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent(); + String pattern = "%s [%s] -%s"; + return String.format(pattern, dateFormat.format(instanceLog.getLogTime()), instanceLog.getLogContent()); } @@ -302,4 +344,8 @@ public class InstanceLogService { public void setGridFsTemplate(GridFsTemplate gridFsTemplate) { this.gridFsTemplate = gridFsTemplate; } + @Autowired + public void setLocalInstanceLogRepository(LocalInstanceLogRepository localInstanceLogRepository) { + this.localInstanceLogRepository = localInstanceLogRepository; + } } diff --git a/oh-my-scheduler-server/src/main/resources/banner.txt b/oh-my-scheduler-server/src/main/resources/banner.txt new file mode 100644 index 00000000..a643c395 --- /dev/null +++ b/oh-my-scheduler-server/src/main/resources/banner.txt @@ -0,0 +1,14 @@ +${AnsiColor.GREEN} + + ███████ ██ ████ ████ ████████ ██ ██ ██ + ██░░░░░██ ░██ ░██░██ ██░██ ██ ██ ██░░░░░░ ░██ ░██ ░██ + ██ ░░██░██ ░██░░██ ██ ░██ ░░██ ██ ░██ █████ ░██ █████ ░██ ██ ██ ░██ █████ ██████ +░██ ░██░██████ ░██ ░░███ ░██ ░░███ ░█████████ ██░░░██░██████ ██░░░██ ██████░██ ░██ ░██ ██░░░██░░██░░█ +░██ ░██░██░░░██░██ ░░█ ░██ ░██ ░░░░░░░░██░██ ░░ ░██░░░██░███████ ██░░░██░██ ░██ ░██░███████ ░██ ░ +░░██ ██ ░██ ░██░██ ░ ░██ ██ ░██░██ ██░██ ░██░██░░░░ ░██ ░██░██ ░██ ░██░██░░░░ ░██ + ░░███████ ░██ ░██░██ ░██ ██ ████████ ░░█████ ░██ ░██░░██████░░██████░░██████ ███░░██████░███ + ░░░░░░░ ░░ ░░ ░░ ░░ ░░ ░░░░░░░░ ░░░░░ ░░ ░░ ░░░░░░ ░░░░░░ ░░░░░░ ░░░ ░░░░░░ ░░░ +${AnsiColor.BRIGHT_RED} +* Maintainer: tengjiqi@gmail.com +* SourceCode: https://github.com/KFCFans/OhMyScheduler +* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)