From 43dfc9a2657b2408a6dc74d458063cac0d736d2c Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 25 Feb 2023 23:07:45 +0800 Subject: [PATCH] feat: finished DAILY_TIME_INTERVAL processor --- .../scheduler/CoreScheduleTaskManager.java | 4 ++- .../core/scheduler/PowerScheduleService.java | 30 ++++++++++--------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java index c36399fe..4a46e497 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.TimeExpressionType; import java.util.ArrayList; import java.util.List; @@ -30,7 +31,8 @@ public class CoreScheduleTaskManager implements InitializingBean, DisposableBean @Override public void afterPropertiesSet() { // 定时调度 - coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronJob), "Thread-ScheduleCronJob")); + coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob")); + coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob")); // 数据清理 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 5f65fa07..1b4fd5e8 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -68,23 +68,23 @@ public class PowerScheduleService { public static final long SCHEDULE_RATE = 15000; - public void scheduleCronJob() { + public void scheduleNormalJob(TimeExpressionType timeExpressionType) { long start = System.currentTimeMillis(); // 调度 CRON 表达式 JOB try { final List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { - log.info("[CronJobSchedule] current server has no app's job to schedule."); + log.info("[NormalScheduler] current server has no app's job to schedule."); return; } - scheduleCronJobCore(allAppIds); + scheduleNormalJob0(timeExpressionType, allAppIds); } catch (Exception e) { - log.error("[CronJobSchedule] schedule cron job failed.", e); + log.error("[NormalScheduler] schedule cron job failed.", e); } long cost = System.currentTimeMillis() - start; - log.info("[CronJobSchedule] cron job schedule use {} ms.", cost); + log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost); if (cost > SCHEDULE_RATE) { - log.warn("[CronJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost); + log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost); } } @@ -143,9 +143,11 @@ public class PowerScheduleService { } /** - * 调度 CRON 表达式类型的任务 + * 调度普通服务端计算表达式类型(CRON、DAILY_TIME_INTERVAL)的任务 + * @param timeExpressionType 表达式类型 + * @param appIds appIds */ - private void scheduleCronJobCore(List appIds) { + private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List appIds) { long nowTime = System.currentTimeMillis(); long timeThreshold = nowTime + 2 * SCHEDULE_RATE; @@ -154,7 +156,7 @@ public class PowerScheduleService { try { // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 - List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold); if (CollectionUtils.isEmpty(jobInfos)) { return; @@ -162,7 +164,7 @@ public class PowerScheduleService { // 1. 批量写日志表 Map jobId2InstanceId = Maps.newHashMap(); - log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); + log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos); jobInfos.forEach(jobInfo -> { Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId(); @@ -189,7 +191,7 @@ public class PowerScheduleService { // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) jobInfos.forEach(jobInfoDO -> { try { - refreshJob(jobInfoDO); + refreshJob(timeExpressionType, jobInfoDO); } catch (Exception e) { log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e); } @@ -198,7 +200,7 @@ public class PowerScheduleService { } catch (Exception e) { - log.error("[CronScheduler] schedule cron job failed.", e); + log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e); } }); } @@ -284,9 +286,9 @@ public class PowerScheduleService { }); } - private void refreshJob(JobInfoDO jobInfo) { + private void refreshJob(TimeExpressionType timeExpressionType, JobInfoDO jobInfo) { LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); - Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(jobInfo.getNextTriggerTime(), TimeExpressionType.CRON, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); + Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(jobInfo.getNextTriggerTime(), timeExpressionType, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); JobInfoDO updatedJobInfo = new JobInfoDO(); BeanUtils.copyProperties(jobInfo, updatedJobInfo);