diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index a12f05d0..42c67d0c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -1,23 +1,5 @@ package tech.powerjob.server.core.instance; -import org.springframework.core.task.AsyncTaskExecutor; -import org.springframework.core.task.TaskExecutor; -import tech.powerjob.common.enums.LogLevel; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.model.InstanceLogContent; -import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.common.utils.SegmentLock; -import tech.powerjob.server.common.constants.PJThreadPool; -import tech.powerjob.server.extension.dfs.*; -import tech.powerjob.server.persistence.storage.Constants; -import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.common.utils.OmsFileUtils; -import tech.powerjob.server.persistence.StringPage; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.local.LocalInstanceLogDO; -import tech.powerjob.server.persistence.local.LocalInstanceLogRepository; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -27,18 +9,37 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.enums.LogLevel; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.InstanceLogContent; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.SegmentLock; +import tech.powerjob.server.common.constants.PJThreadPool; +import tech.powerjob.server.common.utils.OmsFileUtils; +import tech.powerjob.server.extension.dfs.*; +import tech.powerjob.server.persistence.StringPage; +import tech.powerjob.server.persistence.local.LocalInstanceLogDO; +import tech.powerjob.server.persistence.local.LocalInstanceLogRepository; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.storage.Constants; +import tech.powerjob.server.remote.server.redirector.DesignateServer; import javax.annotation.Resource; import java.io.*; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.*; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -91,9 +92,9 @@ public class InstanceLogService { */ private static final int MAX_LINE_COUNT = 100; /** - * 过期时间 + * 更新中的日志缓存时间 */ - private static final long EXPIRE_INTERVAL_MS = 60000; + private static final long LOG_CACHE_TIME = 10000; /** * 提交日志记录,持久化到本地数据库中 @@ -248,7 +249,7 @@ public class InstanceLogService { return localTransactionTemplate.execute(status -> { File f = new File(path); // 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回) - if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) { + if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < LOG_CACHE_TIME) { return f; } try { diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java index 215a95a0..2b77c370 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java @@ -17,6 +17,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; /** @@ -40,7 +41,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor { log.info("taskContext:{}", JsonUtils.toJSONString(context)); // 根据控制台参数获取MR批次及子任务大小 - final JSONObject jobParams = JSONObject.parseObject(context.getJobParams()); + final JSONObject jobParams = Optional.ofNullable(context.getJobParams()).map(JSONObject::parseObject).orElse(new JSONObject()); Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100); Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);