diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java new file mode 100644 index 00000000..0639ff76 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java @@ -0,0 +1,46 @@ +package com.github.kfcfans.oms.server.common.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 公用线程池配置 + * + * @author tjq + * @since 2020/4/28 + */ +@EnableAsync +@Configuration +public class ThreadPoolConfig { + + @Bean("timingTaskExecutor") + public Executor getTimingPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("timingTaskExecutor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + return executor; + } + + + @Bean("commonTaskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("commonTaskExecutor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + return executor; + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java index bf99d3f7..0b189247 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.server.persistence.local; import org.springframework.data.jpa.repository.JpaRepository; + +import java.util.List; import java.util.stream.Stream; /** @@ -17,4 +19,6 @@ public interface LocalInstanceLogRepository extends JpaRepository instanceIds, Long t); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java index 79afc054..a213b5d6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/mongodb/InstanceLogDO.java @@ -9,7 +9,7 @@ import java.util.List; /** * 任务实例的运行时日志 * - * @author tjq + * @author YuHuaFans(余华的小说确实挺好看的...虽然看完总是要忧郁几天...) * @since 2020/4/27 */ @Data diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index a6756a7f..7d07a3a1 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -1,12 +1,16 @@ package com.github.kfcfans.oms.server.service; +import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.model.InstanceLogContent; import com.github.kfcfans.common.utils.CommonUtils; +import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository; import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; @@ -15,10 +19,14 @@ import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -39,6 +47,9 @@ public class InstanceLogService { @Resource private LocalInstanceLogRepository localInstanceLogRepository; + // 本地维护了在线日志的任务实例ID + private final Set instanceIds = Sets.newConcurrentHashSet(); + private static final String SPACE = " "; private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; @@ -52,6 +63,8 @@ public class InstanceLogService { public void submitLogs(String workerAddress, List logs) { List logList = logs.stream().map(x -> { + instanceIds.add(x.getInstanceId()); + LocalInstanceLogDO y = new LocalInstanceLogDO(); BeanUtils.copyProperties(x, y); y.setWorkerAddress(workerAddress); @@ -69,9 +82,15 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Async + @Async("commonTaskExecutor") public void sync(Long instanceId) { + // 休眠10秒等待全部数据上报(OmsLogHandler 每隔5秒上报数据) + try { + TimeUnit.SECONDS.sleep(10); + }catch (Exception ignore) { + } + Stopwatch sw = Stopwatch.createStarted(); FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); @@ -102,11 +121,12 @@ public class InstanceLogService { // 删除本地数据 try { CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); + + instanceIds.remove(instanceId); + log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop()); }catch (Exception e) { log.warn("[InstanceLogService] delete local instanceLogs failed.", e); } - - log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop()); } private void saveToMongoDB(Long instanceId, List logList, AtomicBoolean initialized) { @@ -131,4 +151,34 @@ public class InstanceLogService { log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e); } } + + @Async("timingTaskExecutor") + @Scheduled(fixedDelay = 60000) + public void timingCheck() { + + // 1. 定时删除秒级任务的日志 + List frequentInstanceIds = Lists.newLinkedList(); + instanceIds.forEach(instanceId -> { + JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId); + if (jobInfo == null) { + return; + } + + if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { + frequentInstanceIds.add(instanceId); + } + }); + + if (!CollectionUtils.isEmpty(frequentInstanceIds)) { + // 只保留最近10分钟的日志 + long time = System.currentTimeMillis() - 10 * 60 * 1000; + Lists.partition(frequentInstanceIds, 100).forEach(p -> { + try { + localInstanceLogRepository.deleteByInstanceIdInAndLogTimeLessThan(p, time); + }catch (Exception e) { + log.warn("[InstanceLogService] delete expired logs for instance: {} failed.", p, e); + } + }); + } + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index a010a6eb..9466d091 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -9,6 +9,7 @@ import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.service.DispatchService; +import com.github.kfcfans.oms.server.service.InstanceLogService; import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; @@ -35,6 +36,7 @@ public class InstanceManager { // Spring Bean private static DispatchService dispatchService; + private static InstanceLogService instanceLogService; private static InstanceInfoRepository instanceInfoRepository; private static JobInfoRepository jobInfoRepository; @@ -133,14 +135,30 @@ public class InstanceManager { // 同步状态变更信息到数据库 getInstanceInfoRepository().saveAndFlush(updateEntity); - // 清除已完成的实例信息 if (finished) { - instanceId2StatusHolder.remove(instanceId); - // 这一步也可能导致后面取不到 JobInfoDO - instanceId2JobInfo.remove(instanceId); + processFinishedInstance(instanceId); } } + /** + * 收尾完成的任务实例 + * @param instanceId 任务实例ID + */ + public static void processFinishedInstance(Long instanceId) { + + // 清除已完成的实例信息 + instanceId2StatusHolder.remove(instanceId); + // 这一步也可能导致后面取不到 JobInfoDO + instanceId2JobInfo.remove(instanceId); + + // 上报日志数据 + getInstanceLogService().sync(instanceId); + } + + public static JobInfoDO fetchJobInfo(Long instanceId) { + return instanceId2JobInfo.get(instanceId); + } + private static InstanceInfoRepository getInstanceInfoRepository() { while (instanceInfoRepository == null) { try { @@ -173,4 +191,15 @@ public class InstanceManager { } return dispatchService; } + + private static InstanceLogService getInstanceLogService() { + while (instanceLogService == null) { + try { + Thread.sleep(100); + }catch (Exception ignore) { + } + instanceLogService = SpringUtils.getBean(InstanceLogService.class); + } + return instanceLogService; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index 4216dffd..f2e0471b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -64,6 +64,8 @@ public class InstanceService { instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfoRepository.saveAndFlush(instanceInfoDO); + InstanceManager.processFinishedInstance(instanceId); + /* 不可靠通知停止 TaskTracker 假如没有成功关闭,之后 TaskTracker 会再次 reportStatus,按照流程,instanceLog 会被更新为 RUNNING,开发者可以再次手动关闭 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index 58c2e35b..5857ca7e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -12,9 +12,11 @@ import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoReposito import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.service.DispatchService; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -48,6 +50,7 @@ public class InstanceStatusCheckService { @Resource private JobInfoRepository jobInfoRepository; + @Async("timingTaskExecutor") @Scheduled(fixedRate = 10000) public void timingStatusCheck() { Stopwatch stopwatch = Stopwatch.createStarted(); @@ -143,5 +146,7 @@ public class InstanceStatusCheckService { instance.setGmtModified(new Date()); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instanceInfoRepository.saveAndFlush(instance); + + InstanceManager.processFinishedInstance(instance.getInstanceId()); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 5b38c504..29362fb4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -62,6 +63,7 @@ public class JobScheduleService { private static final long SCHEDULE_RATE = 15000; + @Async("timingTaskExecutor") @Scheduled(fixedRate = SCHEDULE_RATE) public void timingSchedule() { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 2a89762f..b09be7da 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -8,6 +8,7 @@ import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.common.utils.JsonUtils; import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; +import com.github.kfcfans.oms.worker.background.OmsLogHandler; import com.github.kfcfans.oms.worker.background.ServerDiscoveryService; import com.github.kfcfans.oms.worker.background.WorkerHealthReporter; import com.github.kfcfans.oms.worker.common.OhMyConfig; @@ -118,9 +119,10 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di // 初始化定时任务 ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); - timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory); + timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory); timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS); + timingPool.scheduleWithFixedDelay(OmsLogHandler.INSTANCE.logSubmitter, 0, 5, TimeUnit.SECONDS); log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); }catch (Exception e) {