feat: finished DAILY_TIME_INTERVAL processor

This commit is contained in:
tjq 2023-02-25 23:07:45 +08:00
parent 3aa42819e4
commit 43dfc9a265
2 changed files with 19 additions and 15 deletions

View File

@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.TimeExpressionType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -30,7 +31,8 @@ public class CoreScheduleTaskManager implements InitializingBean, DisposableBean
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
// 定时调度 // 定时调度
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronJob), "Thread-ScheduleCronJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));
// 数据清理 // 数据清理

View File

@ -68,23 +68,23 @@ public class PowerScheduleService {
public static final long SCHEDULE_RATE = 15000; public static final long SCHEDULE_RATE = 15000;
public void scheduleCronJob() { public void scheduleNormalJob(TimeExpressionType timeExpressionType) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
// 调度 CRON 表达式 JOB // 调度 CRON 表达式 JOB
try { try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) { if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[CronJobSchedule] current server has no app's job to schedule."); log.info("[NormalScheduler] current server has no app's job to schedule.");
return; return;
} }
scheduleCronJobCore(allAppIds); scheduleNormalJob0(timeExpressionType, allAppIds);
} catch (Exception e) { } catch (Exception e) {
log.error("[CronJobSchedule] schedule cron job failed.", e); log.error("[NormalScheduler] schedule cron job failed.", e);
} }
long cost = System.currentTimeMillis() - start; long cost = System.currentTimeMillis() - start;
log.info("[CronJobSchedule] cron job schedule use {} ms.", cost); log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost);
if (cost > SCHEDULE_RATE) { if (cost > SCHEDULE_RATE) {
log.warn("[CronJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost); log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost);
} }
} }
@ -143,9 +143,11 @@ public class PowerScheduleService {
} }
/** /**
* 调度 CRON 表达式类型的任务 * 调度普通服务端计算表达式类型CRONDAILY_TIME_INTERVAL的任务
* @param timeExpressionType 表达式类型
* @param appIds appIds
*/ */
private void scheduleCronJobCore(List<Long> appIds) { private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List<Long> appIds) {
long nowTime = System.currentTimeMillis(); long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE; long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
@ -154,7 +156,7 @@ public class PowerScheduleService {
try { try {
// 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 // 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold);
if (CollectionUtils.isEmpty(jobInfos)) { if (CollectionUtils.isEmpty(jobInfos)) {
return; return;
@ -162,7 +164,7 @@ public class PowerScheduleService {
// 1. 批量写日志表 // 1. 批量写日志表
Map<Long, Long> jobId2InstanceId = Maps.newHashMap(); Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos);
jobInfos.forEach(jobInfo -> { jobInfos.forEach(jobInfo -> {
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId(); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();
@ -189,7 +191,7 @@ public class PowerScheduleService {
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms // 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
jobInfos.forEach(jobInfoDO -> { jobInfos.forEach(jobInfoDO -> {
try { try {
refreshJob(jobInfoDO); refreshJob(timeExpressionType, jobInfoDO);
} catch (Exception e) { } catch (Exception e) {
log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e); log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
} }
@ -198,7 +200,7 @@ public class PowerScheduleService {
} catch (Exception e) { } catch (Exception e) {
log.error("[CronScheduler] schedule cron job failed.", e); log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e);
} }
}); });
} }
@ -284,9 +286,9 @@ public class PowerScheduleService {
}); });
} }
private void refreshJob(JobInfoDO jobInfo) { private void refreshJob(TimeExpressionType timeExpressionType, JobInfoDO jobInfo) {
LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(jobInfo.getNextTriggerTime(), TimeExpressionType.CRON, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(jobInfo.getNextTriggerTime(), timeExpressionType, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
JobInfoDO updatedJobInfo = new JobInfoDO(); JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfo, updatedJobInfo); BeanUtils.copyProperties(jobInfo, updatedJobInfo);