mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[dev] use a super cool banner
This commit is contained in:
parent
d74a767733
commit
7aa6d2cc6f
@ -11,20 +11,12 @@ import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata;
|
|||||||
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
|
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Queues;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.errorprone.annotations.CanIgnoreReturnValue;
|
|
||||||
import com.mongodb.client.gridfs.GridFSBucket;
|
|
||||||
import com.mongodb.client.gridfs.GridFSDownloadStream;
|
|
||||||
import com.mongodb.client.gridfs.model.GridFSFile;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.time.FastDateFormat;
|
import org.apache.commons.lang3.time.FastDateFormat;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.mongodb.MongoDbFactory;
|
|
||||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
||||||
import org.springframework.data.mongodb.core.query.Criteria;
|
|
||||||
import org.springframework.data.mongodb.core.query.Query;
|
|
||||||
import org.springframework.data.mongodb.gridfs.GridFsCriteria;
|
|
||||||
import org.springframework.data.mongodb.gridfs.GridFsResource;
|
import org.springframework.data.mongodb.gridfs.GridFsResource;
|
||||||
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
|
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
@ -32,13 +24,15 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.util.ResourceUtils;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ -52,22 +46,37 @@ import java.util.stream.Stream;
|
|||||||
@Service
|
@Service
|
||||||
public class InstanceLogService {
|
public class InstanceLogService {
|
||||||
|
|
||||||
|
// 直接操作 mongoDB 文件系统
|
||||||
private GridFsTemplate gridFsTemplate;
|
private GridFsTemplate gridFsTemplate;
|
||||||
@Resource
|
|
||||||
private LocalInstanceLogRepository localInstanceLogRepository;
|
private LocalInstanceLogRepository localInstanceLogRepository;
|
||||||
|
|
||||||
// 本地维护了在线日志的任务实例ID
|
// 本地维护了在线日志的任务实例ID
|
||||||
private final Set<Long> instanceIds = Sets.newConcurrentHashSet();
|
private final Set<Long> instanceIds = Sets.newConcurrentHashSet();
|
||||||
|
// 锁(可重入锁也太坑了吧,需要考虑同一个线程重复下载的问题 -> 因为下载交给了额外的线程去做...)
|
||||||
|
private final int lockNum;
|
||||||
|
private final Lock[] locks;
|
||||||
|
private final Executor workerPool;
|
||||||
|
|
||||||
private static final String SPACE = " ";
|
// 格式化时间戳
|
||||||
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
|
private static final FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");
|
||||||
|
// 用户路径
|
||||||
private static final FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN);
|
|
||||||
|
|
||||||
// 文件路径
|
|
||||||
private static final String USER_HOME = System.getProperty("user.home", "oms");
|
private static final String USER_HOME = System.getProperty("user.home", "oms");
|
||||||
// 每一个展示的行数
|
// 每一个展示的行数
|
||||||
private static final int MAX_LINE_COUNT = 500;
|
private static final int MAX_LINE_COUNT = 500;
|
||||||
|
// 过期时间
|
||||||
|
private static final long EXPIRE_INTERVAL_MS = 60000;
|
||||||
|
// 小文件阈值(2M)
|
||||||
|
private static final int SMALL_FILE_MAX_SIZE = 2 * 1024 * 1024;
|
||||||
|
|
||||||
|
public InstanceLogService() {
|
||||||
|
lockNum = Runtime.getRuntime().availableProcessors();
|
||||||
|
locks = new ReentrantLock[lockNum];
|
||||||
|
for (int i = 0; i < lockNum; i++) {
|
||||||
|
locks[i] = new ReentrantLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
workerPool = new ThreadPoolExecutor(lockNum, lockNum, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 提交日志记录,持久化到本地数据库中
|
* 提交日志记录,持久化到本地数据库中
|
||||||
@ -102,22 +111,31 @@ public class InstanceLogService {
|
|||||||
public StringPage fetchInstanceLog(Long instanceId, long index) {
|
public StringPage fetchInstanceLog(Long instanceId, long index) {
|
||||||
|
|
||||||
File logFile = new File(genLogFilePath(instanceId));
|
File logFile = new File(genLogFilePath(instanceId));
|
||||||
|
Lock lock = locks[(int) (instanceId % lockNum)];
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
long logCount = localInstanceLogRepository.countByInstanceId(instanceId);
|
long logCount = localInstanceLogRepository.countByInstanceId(instanceId);
|
||||||
|
|
||||||
// 构建本地日志文件
|
// 直接从本地数据库构建日志文件
|
||||||
if (logCount != 0) {
|
if (logCount != 0) {
|
||||||
|
|
||||||
// 数据库中存在,说明数据还在更新中,需要重新生成
|
// 存在则判断上次更改时间,1分钟内有效
|
||||||
|
if (logFile.exists()) {
|
||||||
|
long offset = System.currentTimeMillis() - logFile.lastModified();
|
||||||
|
// 过期才选择重新构建文件
|
||||||
|
if (offset > EXPIRE_INTERVAL_MS) {
|
||||||
Stream<LocalInstanceLogDO> logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId);
|
Stream<LocalInstanceLogDO> logStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId);
|
||||||
|
|
||||||
|
// 这里直接用 Controller 线程执行,毕竟本地持久化用不了多少时间,因此不用考虑可重入锁的问题
|
||||||
stream2File(logStream, instanceId);
|
stream2File(logStream, instanceId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}else {
|
}else {
|
||||||
|
|
||||||
if (gridFsTemplate == null) {
|
if (gridFsTemplate == null) {
|
||||||
return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the logs.");
|
return StringPage.simple("There is no local log for this task now, you need to use mongoDB to store the past logs.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 不存在,需要重新下载
|
// 不存在,需要重新下载
|
||||||
@ -125,16 +143,16 @@ public class InstanceLogService {
|
|||||||
GridFsResource gridFsResource = gridFsTemplate.getResource(genLogFileName(instanceId));
|
GridFsResource gridFsResource = gridFsTemplate.getResource(genLogFileName(instanceId));
|
||||||
|
|
||||||
if (!gridFsResource.exists()) {
|
if (!gridFsResource.exists()) {
|
||||||
return StringPage.simple("There is no online log for this job instance");
|
return StringPage.simple("There is no online log for this job instance.");
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] buffer = new byte[1024];
|
long targetFileSize = gridFsResource.contentLength();
|
||||||
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
|
// 小文件 Controller 线程直接执行,大文件则异步下载,先返回 downloading...
|
||||||
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
|
if (targetFileSize <= SMALL_FILE_MAX_SIZE) {
|
||||||
) {
|
gridFs2File(gridFsResource, logFile);
|
||||||
while (gis.read(buffer) != -1) {
|
}else {
|
||||||
bos.write(buffer);
|
workerPool.execute(() -> gridFs2File(gridFsResource, logFile));
|
||||||
}
|
return StringPage.simple("downloading from mongoDB, please retry some time later~");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -164,11 +182,11 @@ public class InstanceLogService {
|
|||||||
|
|
||||||
double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
|
double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
|
||||||
return new StringPage(index, (long) totalPage, sb.toString());
|
return new StringPage(index, (long) totalPage, sb.toString());
|
||||||
|
|
||||||
|
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e);
|
log.error("[InstanceLogService] fetchInstanceLog for instance(instanceId={}) failed.", instanceId, e);
|
||||||
return StringPage.simple("unknown error from oms-server, please see oms-server's log to find the problem");
|
return StringPage.simple("unknown error from oms-server, please see oms-server's log to find the problem");
|
||||||
|
}finally {
|
||||||
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,10 +249,14 @@ public class InstanceLogService {
|
|||||||
* @param stream 流
|
* @param stream 流
|
||||||
* @param instanceId 任务实例ID
|
* @param instanceId 任务实例ID
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("all")
|
|
||||||
private void stream2File(Stream<LocalInstanceLogDO> stream, long instanceId) {
|
private void stream2File(Stream<LocalInstanceLogDO> stream, long instanceId) {
|
||||||
File logFile = new File(genLogFilePath(instanceId));
|
File logFile = new File(genLogFilePath(instanceId));
|
||||||
logFile.getParentFile().mkdirs();
|
if (!logFile.getParentFile().exists()) {
|
||||||
|
if (!logFile.getParentFile().mkdirs()) {
|
||||||
|
log.warn("[InstanceLogService] create dir for instanceLog failed, path is {}.", logFile.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) {
|
try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) {
|
||||||
stream.forEach(instanceLog -> {
|
stream.forEach(instanceLog -> {
|
||||||
try {
|
try {
|
||||||
@ -249,6 +271,25 @@ public class InstanceLogService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将MongoDB中存储的日志持久化为磁盘日志
|
||||||
|
* @param gridFsResource mongoDB 文件资源
|
||||||
|
* @param logFile 本地文件资源
|
||||||
|
*/
|
||||||
|
private void gridFs2File(GridFsResource gridFsResource, File logFile) {
|
||||||
|
byte[] buffer = new byte[1024];
|
||||||
|
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
|
||||||
|
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
|
||||||
|
) {
|
||||||
|
while (gis.read(buffer) != -1) {
|
||||||
|
bos.write(buffer);
|
||||||
|
}
|
||||||
|
bos.flush();
|
||||||
|
}catch (Exception e) {
|
||||||
|
log.warn("[InstanceLogService] download instanceLog to local file({}) failed.", logFile.getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -257,7 +298,8 @@ public class InstanceLogService {
|
|||||||
* @return 字符串
|
* @return 字符串
|
||||||
*/
|
*/
|
||||||
private static String convertLog(LocalInstanceLogDO instanceLog) {
|
private static String convertLog(LocalInstanceLogDO instanceLog) {
|
||||||
return dateFormat.format(instanceLog.getLogTime()) + SPACE + instanceLog.getWorkerAddress() + SPACE + instanceLog.getLogContent();
|
String pattern = "%s [%s] -%s";
|
||||||
|
return String.format(pattern, dateFormat.format(instanceLog.getLogTime()), instanceLog.getLogContent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -302,4 +344,8 @@ public class InstanceLogService {
|
|||||||
public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
|
public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
|
||||||
this.gridFsTemplate = gridFsTemplate;
|
this.gridFsTemplate = gridFsTemplate;
|
||||||
}
|
}
|
||||||
|
@Autowired
|
||||||
|
public void setLocalInstanceLogRepository(LocalInstanceLogRepository localInstanceLogRepository) {
|
||||||
|
this.localInstanceLogRepository = localInstanceLogRepository;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
14
oh-my-scheduler-server/src/main/resources/banner.txt
Normal file
14
oh-my-scheduler-server/src/main/resources/banner.txt
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
${AnsiColor.GREEN}
|
||||||
|
|
||||||
|
███████ ██ ████ ████ ████████ ██ ██ ██
|
||||||
|
██░░░░░██ ░██ ░██░██ ██░██ ██ ██ ██░░░░░░ ░██ ░██ ░██
|
||||||
|
██ ░░██░██ ░██░░██ ██ ░██ ░░██ ██ ░██ █████ ░██ █████ ░██ ██ ██ ░██ █████ ██████
|
||||||
|
░██ ░██░██████ ░██ ░░███ ░██ ░░███ ░█████████ ██░░░██░██████ ██░░░██ ██████░██ ░██ ░██ ██░░░██░░██░░█
|
||||||
|
░██ ░██░██░░░██░██ ░░█ ░██ ░██ ░░░░░░░░██░██ ░░ ░██░░░██░███████ ██░░░██░██ ░██ ░██░███████ ░██ ░
|
||||||
|
░░██ ██ ░██ ░██░██ ░ ░██ ██ ░██░██ ██░██ ░██░██░░░░ ░██ ░██░██ ░██ ░██░██░░░░ ░██
|
||||||
|
░░███████ ░██ ░██░██ ░██ ██ ████████ ░░█████ ░██ ░██░░██████░░██████░░██████ ███░░██████░███
|
||||||
|
░░░░░░░ ░░ ░░ ░░ ░░ ░░ ░░░░░░░░ ░░░░░ ░░ ░░ ░░░░░░ ░░░░░░ ░░░░░░ ░░░ ░░░░░░ ░░░
|
||||||
|
${AnsiColor.BRIGHT_RED}
|
||||||
|
* Maintainer: tengjiqi@gmail.com
|
||||||
|
* SourceCode: https://github.com/KFCFans/OhMyScheduler
|
||||||
|
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)
|
Loading…
x
Reference in New Issue
Block a user