diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java index 09df140f..664a671c 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java @@ -1,13 +1,18 @@ package tech.powerjob.server.common.utils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import tech.powerjob.common.RemoteConstant; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.net.ntp.NTPUDPClient; import org.apache.commons.net.ntp.NtpV3Packet; import org.apache.commons.net.ntp.TimeInfo; +import tech.powerjob.server.common.SJ; import java.net.InetAddress; +import java.text.ParseException; +import java.util.Date; import java.util.List; /** @@ -24,7 +29,44 @@ public class TimeUtils { // 最大误差 5S private static final long MAX_OFFSET = 5000; - public static void check() throws TimeCheckException { + /** + * 计算 CRON 表达式的下一次执行时间 + * @param cron CRON 表达式 + * @param startTime 下一次执行时间的起始时间(不得早于该时间) + * @param sortedLifeCycle 额外的生命周期,格式为 ts1-ts2,ts3-ts4,下一次执行时间必须符合该区间范围 + * @return 执行时间 or NULL + * @throws ParseException CRON 表达式解析异常 + */ + public static Date calculateNextCronTime(String cron, long startTime, String sortedLifeCycle) throws ParseException { + CronExpression ce = new CronExpression(cron); + if (StringUtils.isEmpty(sortedLifeCycle)) { + return ce.getNextValidTimeAfter(new Date(startTime)); + } + + List> remainingLifecycle = Lists.newLinkedList(); + List lifeCycle = SJ.COMMA_SPLITTER.splitToList(sortedLifeCycle); + + // 只保留结束时间在当前时间前的生命周期 + lifeCycle.forEach(range -> { + String[] split = range.split("-"); + long end = Long.parseLong(split[1]); + if (end < startTime) { + return; + } + remainingLifecycle.add(Pair.of(Long.valueOf(split[0]), end)); + }); + + for (Pair range : remainingLifecycle) { + long newStartTime = Math.max(range.getLeft(), startTime); + Date nextValidTime = ce.getNextValidTimeAfter(new Date(newStartTime)); + if (nextValidTime != null && nextValidTime.getTime() <= range.getRight()) { + return nextValidTime; + } + } + return null; + } + + public static void checkServerTime() throws TimeCheckException { NTPUDPClient timeClient = new NTPUDPClient(); @@ -50,7 +92,7 @@ public class TimeUtils { } } throw new TimeCheckException("no available ntp server, maybe alibaba, sjtu and apple are both collapse"); - }finally { + } finally { timeClient.close(); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index b311ce4e..e6405f39 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -1,23 +1,5 @@ package tech.powerjob.server.core.scheduler; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.utils.CronExpression; -import tech.powerjob.server.persistence.remote.model.AppInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; -import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; -import tech.powerjob.server.core.DispatchService; -import tech.powerjob.server.core.service.JobService; -import tech.powerjob.server.remote.worker.WorkerClusterManagerService; -import tech.powerjob.server.core.instance.InstanceService; -import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; -import tech.powerjob.server.core.workflow.WorkflowInstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,6 +10,24 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; +import tech.powerjob.server.common.utils.TimeUtils; +import tech.powerjob.server.core.DispatchService; +import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.core.service.JobService; +import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.persistence.remote.model.AppInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; +import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; +import tech.powerjob.server.remote.transport.starter.AkkaStarter; +import tech.powerjob.server.remote.worker.WorkerClusterManagerService; import javax.annotation.Resource; import java.text.ParseException; @@ -254,7 +254,7 @@ public class PowerScheduleService { } private void refreshJob(JobInfoDO jobInfo) throws ParseException { - Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); + Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression(), jobInfo.getLifecycle()); JobInfoDO updatedJobInfo = new JobInfoDO(); BeanUtils.copyProperties(jobInfo, updatedJobInfo); @@ -271,7 +271,7 @@ public class PowerScheduleService { } private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException { - Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); + Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression(), wfInfo.getLifecycle()); WorkflowInfoDO updateEntity = new WorkflowInfoDO(); BeanUtils.copyProperties(wfInfo, updateEntity); @@ -295,11 +295,10 @@ public class PowerScheduleService { * @return 下一次调度时间 * @throws ParseException 异常 */ - private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws ParseException { + private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression, String lifecycle) throws ParseException { - CronExpression ce = new CronExpression(cronExpression); // 取最大值,防止长时间未调度任务被连续调度(原来DISABLE的任务突然被打开,不取最大值会补上过去所有的调度) long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime); - return ce.getNextValidTimeAfter(new Date(benchmarkTime)); + return TimeUtils.calculateNextCronTime(cronExpression, benchmarkTime, lifecycle); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index 9c198373..ef5f384c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -1,28 +1,28 @@ package tech.powerjob.server.core.service; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.PowerQuery; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.request.http.SaveJobInfoRequest; -import tech.powerjob.common.response.JobInfoDTO; -import tech.powerjob.server.common.SJ; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.utils.CronExpression; -import tech.powerjob.server.persistence.QueryConvertUtils; -import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.core.DispatchService; -import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.core.instance.InstanceService; -import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.request.http.SaveJobInfoRequest; +import tech.powerjob.common.response.JobInfoDTO; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; +import tech.powerjob.server.common.utils.TimeUtils; +import tech.powerjob.server.core.DispatchService; +import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.persistence.QueryConvertUtils; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.server.redirector.DesignateServer; import javax.annotation.Resource; import java.text.ParseException; @@ -238,8 +238,7 @@ public class JobService { TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); if (timeExpressionType == TimeExpressionType.CRON) { - CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); - Date nextValidTime = cronExpression.getNextValidTimeAfter(now); + Date nextValidTime = TimeUtils.calculateNextCronTime(jobInfoDO.getTimeExpression(), System.currentTimeMillis(), jobInfoDO.getLifecycle()); if (nextValidTime == null) { throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression()); } diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/common/utils/TimeUtilsTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/common/utils/TimeUtilsTest.java new file mode 100644 index 00000000..d62ee123 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/common/utils/TimeUtilsTest.java @@ -0,0 +1,44 @@ +package tech.powerjob.server.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Date; + +@Slf4j +class TimeUtilsTest { + + private static final String CRON = "0 0/5 * * * ? *"; + + @Test + void testEmptyOfLifecycle() throws Exception { + Date date = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), null); + log.info("testEmptyOfLifecycle: {}", date); + Assertions.assertNotNull(date); + } + + @Test + void testEndOfLifecycle() throws Exception { + Date date = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), "1217633201188-1417633201188"); + log.info("testEndOfLifecycle: {}", date); + Assertions.assertNull(date); + } + + @Test + void testMultiLifecycle() throws Exception { + String lifecycle = "1217633201188-1417633201188,1417633201188-1717633201188"; + Date date1 = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), lifecycle); + Date date2 = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), null); + + log.info("testMultiLifecycle date1: {}", date1); + log.info("testMultiLifecycle date2: {}", date2); + + Assertions.assertEquals(date1, date2); + } + + @Test + void check() { + TimeUtils.checkServerTime(); + } +} \ No newline at end of file