mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: optimize online log cache time(60 -> 10)
This commit is contained in:
parent
6bcc275a70
commit
09b15dfbc1
@ -1,23 +1,5 @@
|
|||||||
package tech.powerjob.server.core.instance;
|
package tech.powerjob.server.core.instance;
|
||||||
|
|
||||||
import org.springframework.core.task.AsyncTaskExecutor;
|
|
||||||
import org.springframework.core.task.TaskExecutor;
|
|
||||||
import tech.powerjob.common.enums.LogLevel;
|
|
||||||
import tech.powerjob.common.OmsConstant;
|
|
||||||
import tech.powerjob.common.enums.TimeExpressionType;
|
|
||||||
import tech.powerjob.common.model.InstanceLogContent;
|
|
||||||
import tech.powerjob.common.utils.CommonUtils;
|
|
||||||
import tech.powerjob.common.utils.NetUtils;
|
|
||||||
import tech.powerjob.common.utils.SegmentLock;
|
|
||||||
import tech.powerjob.server.common.constants.PJThreadPool;
|
|
||||||
import tech.powerjob.server.extension.dfs.*;
|
|
||||||
import tech.powerjob.server.persistence.storage.Constants;
|
|
||||||
import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
|
||||||
import tech.powerjob.server.common.utils.OmsFileUtils;
|
|
||||||
import tech.powerjob.server.persistence.StringPage;
|
|
||||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
|
||||||
import tech.powerjob.server.persistence.local.LocalInstanceLogDO;
|
|
||||||
import tech.powerjob.server.persistence.local.LocalInstanceLogRepository;
|
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
@ -27,18 +9,37 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
|
|||||||
import org.apache.commons.lang3.time.FastDateFormat;
|
import org.apache.commons.lang3.time.FastDateFormat;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.task.AsyncTaskExecutor;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
import tech.powerjob.common.OmsConstant;
|
||||||
|
import tech.powerjob.common.enums.LogLevel;
|
||||||
|
import tech.powerjob.common.enums.TimeExpressionType;
|
||||||
|
import tech.powerjob.common.model.InstanceLogContent;
|
||||||
|
import tech.powerjob.common.utils.CommonUtils;
|
||||||
|
import tech.powerjob.common.utils.NetUtils;
|
||||||
|
import tech.powerjob.common.utils.SegmentLock;
|
||||||
|
import tech.powerjob.server.common.constants.PJThreadPool;
|
||||||
|
import tech.powerjob.server.common.utils.OmsFileUtils;
|
||||||
|
import tech.powerjob.server.extension.dfs.*;
|
||||||
|
import tech.powerjob.server.persistence.StringPage;
|
||||||
|
import tech.powerjob.server.persistence.local.LocalInstanceLogDO;
|
||||||
|
import tech.powerjob.server.persistence.local.LocalInstanceLogRepository;
|
||||||
|
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||||
|
import tech.powerjob.server.persistence.storage.Constants;
|
||||||
|
import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ -91,9 +92,9 @@ public class InstanceLogService {
|
|||||||
*/
|
*/
|
||||||
private static final int MAX_LINE_COUNT = 100;
|
private static final int MAX_LINE_COUNT = 100;
|
||||||
/**
|
/**
|
||||||
* 过期时间
|
* 更新中的日志缓存时间
|
||||||
*/
|
*/
|
||||||
private static final long EXPIRE_INTERVAL_MS = 60000;
|
private static final long LOG_CACHE_TIME = 10000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 提交日志记录,持久化到本地数据库中
|
* 提交日志记录,持久化到本地数据库中
|
||||||
@ -248,7 +249,7 @@ public class InstanceLogService {
|
|||||||
return localTransactionTemplate.execute(status -> {
|
return localTransactionTemplate.execute(status -> {
|
||||||
File f = new File(path);
|
File f = new File(path);
|
||||||
// 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回)
|
// 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回)
|
||||||
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) {
|
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < LOG_CACHE_TIME) {
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -17,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -40,7 +41,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
|
|||||||
log.info("taskContext:{}", JsonUtils.toJSONString(context));
|
log.info("taskContext:{}", JsonUtils.toJSONString(context));
|
||||||
|
|
||||||
// 根据控制台参数获取MR批次及子任务大小
|
// 根据控制台参数获取MR批次及子任务大小
|
||||||
final JSONObject jobParams = JSONObject.parseObject(context.getJobParams());
|
final JSONObject jobParams = Optional.ofNullable(context.getJobParams()).map(JSONObject::parseObject).orElse(new JSONObject());
|
||||||
|
|
||||||
Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
|
Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
|
||||||
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
|
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user