finished log persistence

This commit is contained in:
tjq 2020-04-29 16:47:43 +08:00
parent 624eba41ef
commit 4fcc77e3c9
9 changed files with 149 additions and 9 deletions

View File

@ -0,0 +1,46 @@
package com.github.kfcfans.oms.server.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 公用线程池配置
*
* @author tjq
* @since 2020/4/28
*/
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean("timingTaskExecutor")
public Executor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("timingTaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}
@Bean("commonTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("commonTaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}
}

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.oms.server.persistence.local; package com.github.kfcfans.oms.server.persistence.local;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@ -17,4 +19,6 @@ public interface LocalInstanceLogRepository extends JpaRepository<LocalInstanceL
// 删除数据 // 删除数据
long deleteByInstanceId(Long instanceId); long deleteByInstanceId(Long instanceId);
long deleteByInstanceIdInAndLogTimeLessThan(List<Long> instanceIds, Long t);
} }

View File

@ -9,7 +9,7 @@ import java.util.List;
/** /**
* 任务实例的运行时日志 * 任务实例的运行时日志
* *
* @author tjq * @author YuHuaFans余华的小说确实挺好看的...虽然看完总是要忧郁几天...
* @since 2020/4/27 * @since 2020/4/27
*/ */
@Data @Data

View File

@ -1,12 +1,16 @@
package com.github.kfcfans.oms.server.service; package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.model.InstanceLogContent; import com.github.kfcfans.common.model.InstanceLogContent;
import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogDO;
import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository; import com.github.kfcfans.oms.server.persistence.local.LocalInstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogDO;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.FastDateFormat; import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
@ -15,10 +19,14 @@ import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -39,6 +47,9 @@ public class InstanceLogService {
@Resource @Resource
private LocalInstanceLogRepository localInstanceLogRepository; private LocalInstanceLogRepository localInstanceLogRepository;
// 本地维护了在线日志的任务实例ID
private final Set<Long> instanceIds = Sets.newConcurrentHashSet();
private static final String SPACE = " "; private static final String SPACE = " ";
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
@ -52,6 +63,8 @@ public class InstanceLogService {
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) { public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
List<LocalInstanceLogDO> logList = logs.stream().map(x -> { List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
instanceIds.add(x.getInstanceId());
LocalInstanceLogDO y = new LocalInstanceLogDO(); LocalInstanceLogDO y = new LocalInstanceLogDO();
BeanUtils.copyProperties(x, y); BeanUtils.copyProperties(x, y);
y.setWorkerAddress(workerAddress); y.setWorkerAddress(workerAddress);
@ -69,9 +82,15 @@ public class InstanceLogService {
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行 * 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
*/ */
@Async @Async("commonTaskExecutor")
public void sync(Long instanceId) { public void sync(Long instanceId) {
// 休眠10秒等待全部数据上报OmsLogHandler 每隔5秒上报数据
try {
TimeUnit.SECONDS.sleep(10);
}catch (Exception ignore) {
}
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN); FastDateFormat dateFormat = FastDateFormat.getInstance(TIME_PATTERN);
@ -102,11 +121,12 @@ public class InstanceLogService {
// 删除本地数据 // 删除本地数据
try { try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
instanceIds.remove(instanceId);
log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop());
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] delete local instanceLogs failed.", e); log.warn("[InstanceLogService] delete local instanceLogs failed.", e);
} }
log.debug("[InstanceLogService] sync local instanceLogs to mongoDB succeed, total logs: {},using: {}.", counter.get(), sw.stop());
} }
private void saveToMongoDB(Long instanceId, List<String> logList, AtomicBoolean initialized) { private void saveToMongoDB(Long instanceId, List<String> logList, AtomicBoolean initialized) {
@ -131,4 +151,34 @@ public class InstanceLogService {
log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e); log.warn("[InstanceLogService] push instanceLog(instanceId={},logList={}) to mongoDB failed.", instanceId, logList, e);
} }
} }
@Async("timingTaskExecutor")
@Scheduled(fixedDelay = 60000)
public void timingCheck() {
// 1. 定时删除秒级任务的日志
List<Long> frequentInstanceIds = Lists.newLinkedList();
instanceIds.forEach(instanceId -> {
JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId);
if (jobInfo == null) {
return;
}
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
frequentInstanceIds.add(instanceId);
}
});
if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
// 只保留最近10分钟的日志
long time = System.currentTimeMillis() - 10 * 60 * 1000;
Lists.partition(frequentInstanceIds, 100).forEach(p -> {
try {
localInstanceLogRepository.deleteByInstanceIdInAndLogTimeLessThan(p, time);
}catch (Exception e) {
log.warn("[InstanceLogService] delete expired logs for instance: {} failed.", p, e);
}
});
}
}
} }

View File

@ -9,6 +9,7 @@ import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService; import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.InstanceLogService;
import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -35,6 +36,7 @@ public class InstanceManager {
// Spring Bean // Spring Bean
private static DispatchService dispatchService; private static DispatchService dispatchService;
private static InstanceLogService instanceLogService;
private static InstanceInfoRepository instanceInfoRepository; private static InstanceInfoRepository instanceInfoRepository;
private static JobInfoRepository jobInfoRepository; private static JobInfoRepository jobInfoRepository;
@ -133,14 +135,30 @@ public class InstanceManager {
// 同步状态变更信息到数据库 // 同步状态变更信息到数据库
getInstanceInfoRepository().saveAndFlush(updateEntity); getInstanceInfoRepository().saveAndFlush(updateEntity);
// 清除已完成的实例信息
if (finished) { if (finished) {
instanceId2StatusHolder.remove(instanceId); processFinishedInstance(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO
instanceId2JobInfo.remove(instanceId);
} }
} }
/**
* 收尾完成的任务实例
* @param instanceId 任务实例ID
*/
public static void processFinishedInstance(Long instanceId) {
// 清除已完成的实例信息
instanceId2StatusHolder.remove(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO
instanceId2JobInfo.remove(instanceId);
// 上报日志数据
getInstanceLogService().sync(instanceId);
}
public static JobInfoDO fetchJobInfo(Long instanceId) {
return instanceId2JobInfo.get(instanceId);
}
private static InstanceInfoRepository getInstanceInfoRepository() { private static InstanceInfoRepository getInstanceInfoRepository() {
while (instanceInfoRepository == null) { while (instanceInfoRepository == null) {
try { try {
@ -173,4 +191,15 @@ public class InstanceManager {
} }
return dispatchService; return dispatchService;
} }
private static InstanceLogService getInstanceLogService() {
while (instanceLogService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
instanceLogService = SpringUtils.getBean(InstanceLogService.class);
}
return instanceLogService;
}
} }

View File

@ -64,6 +64,8 @@ public class InstanceService {
instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceInfoRepository.saveAndFlush(instanceInfoDO); instanceInfoRepository.saveAndFlush(instanceInfoDO);
InstanceManager.processFinishedInstance(instanceId);
/* /*
不可靠通知停止 TaskTracker 不可靠通知停止 TaskTracker
假如没有成功关闭之后 TaskTracker 会再次 reportStatus按照流程instanceLog 会被更新为 RUNNING开发者可以再次手动关闭 假如没有成功关闭之后 TaskTracker 会再次 reportStatus按照流程instanceLog 会被更新为 RUNNING开发者可以再次手动关闭

View File

@ -12,9 +12,11 @@ import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoReposito
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService; import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -48,6 +50,7 @@ public class InstanceStatusCheckService {
@Resource @Resource
private JobInfoRepository jobInfoRepository; private JobInfoRepository jobInfoRepository;
@Async("timingTaskExecutor")
@Scheduled(fixedRate = 10000) @Scheduled(fixedRate = 10000)
public void timingStatusCheck() { public void timingStatusCheck() {
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
@ -143,5 +146,7 @@ public class InstanceStatusCheckService {
instance.setGmtModified(new Date()); instance.setGmtModified(new Date());
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
instanceInfoRepository.saveAndFlush(instance); instanceInfoRepository.saveAndFlush(instance);
InstanceManager.processFinishedInstance(instance.getInstanceId());
} }
} }

View File

@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -62,6 +63,7 @@ public class JobScheduleService {
private static final long SCHEDULE_RATE = 15000; private static final long SCHEDULE_RATE = 15000;
@Async("timingTaskExecutor")
@Scheduled(fixedRate = SCHEDULE_RATE) @Scheduled(fixedRate = SCHEDULE_RATE)
public void timingSchedule() { public void timingSchedule() {

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.JsonUtils; import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.OmsLogHandler;
import com.github.kfcfans.oms.worker.background.ServerDiscoveryService; import com.github.kfcfans.oms.worker.background.ServerDiscoveryService;
import com.github.kfcfans.oms.worker.background.WorkerHealthReporter; import com.github.kfcfans.oms.worker.background.WorkerHealthReporter;
import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.OhMyConfig;
@ -118,9 +119,10 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
// 初始化定时任务 // 初始化定时任务
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build();
timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory); timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory);
timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS);
timingPool.scheduleWithFixedDelay(OmsLogHandler.INSTANCE.logSubmitter, 0, 5, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) { }catch (Exception e) {