diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index 81391b73..a7dfdb9f 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -24,5 +24,9 @@ public class PowerJobDKey { public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored"; public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period"; + /** + * ms + */ + public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval"; } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java index 49b5ad0b..30c10f2e 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java @@ -3,6 +3,7 @@ package tech.powerjob.common.enums; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.ToString; import java.util.List; @@ -14,6 +15,7 @@ import java.util.List; */ @Getter @AllArgsConstructor +@ToString public enum TimeExpressionType { API(1), @@ -24,7 +26,11 @@ public enum TimeExpressionType { int v; - public static final List frequentTypes = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v); + public static final List FREQUENT_TYPES = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v); + /** + * 首次计算触发时间时必须计算出一个有效值 + */ + public static final List INSPECT_TYPES = Lists.newArrayList(CRON.v); public static TimeExpressionType of(int v) { for (TimeExpressionType type : values()) { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/LifeCycle.java b/powerjob-common/src/main/java/tech/powerjob/common/model/LifeCycle.java new file mode 100644 index 00000000..66e1fb88 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/LifeCycle.java @@ -0,0 +1,28 @@ +package tech.powerjob.common.model; + +import lombok.Data; +import tech.powerjob.common.serialize.JsonUtils; + +/** + * @author Echo009 + * @since 2022/3/22 + */ +@Data +public class LifeCycle { + + public static final LifeCycle EMPTY_LIFE_CYCLE = new LifeCycle(); + + private Long start; + + private Long end; + + public static LifeCycle parse(String lifeCycle){ + try { + return JsonUtils.parseObject(lifeCycle,LifeCycle.class); + }catch (Exception e){ + // ignore + return EMPTY_LIFE_CYCLE; + } + } + +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java index 9b3887f7..d365269b 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java @@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import lombok.Data; import tech.powerjob.common.response.JobInfoDTO; @@ -132,7 +133,7 @@ public class SaveJobInfoRequest { private DispatchStrategy dispatchStrategy; - private String lifecycle; + private LifeCycle lifecycle; /** * alarm config */ diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowRequest.java index d542aa89..9feee9b8 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveWorkflowRequest.java @@ -1,6 +1,7 @@ package tech.powerjob.common.request.http; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.utils.CommonUtils; import com.google.common.collect.Lists; @@ -63,6 +64,8 @@ public class SaveWorkflowRequest implements Serializable { /** 点线表示法*/ private PEWorkflowDAG dag; + private LifeCycle lifeCycle; + public void valid() { CommonUtils.requireNonNull(wfName, "workflow name can't be empty"); CommonUtils.requireNonNull(appId, "appId can't be empty"); diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 7d2f4a1b..98ec3f71 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -248,6 +248,11 @@ ${vertx-web.version} + + com.cronutils + cron-utils + 9.1.6 + diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index fae8fe59..a21e21a7 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -111,7 +111,7 @@ public class DispatchService { long current = System.currentTimeMillis(); Integer maxInstanceNum = jobInfo.getMaxInstanceNum(); // 秒级任务只派发到一台机器,具体的 maxInstanceNum 由 TaskTracker 控制 - if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { + if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { maxInstanceNum = 1; } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index 950a0222..f24f776c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -354,7 +354,7 @@ public class InstanceLogService { instanceId2LastReportTime.keySet().forEach(instanceId -> { try { JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId); - if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { + if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { frequentInstanceIds.add(instanceId); } }catch (Exception ignore) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java index 338807f0..a4751c0b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java @@ -7,6 +7,7 @@ import org.springframework.util.StringUtils; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.server.common.module.WorkerInfo; @@ -96,26 +97,28 @@ public class InstanceManager { // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可 // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) // 综上,直接把 status 和 runningNum 同步到DB即可 - if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) { + if (TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType)) { // 如果实例处于失败状态,则说明该 worker 失联了一段时间,被 server 判定为宕机,而此时该秒级任务有可能已经重新派发了,故需要 Kill 掉该实例 // fix issue 375 if (instanceInfo.getStatus() == InstanceStatus.FAILED.getV()) { log.warn("[InstanceManager-{}] receive TaskTracker's report: {}, but current instance is already failed, this instance should be killed.", instanceId, req); - Optional workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress()); - if (workerInfoOpt.isPresent()){ - ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId); - WorkerInfo workerInfo = workerInfoOpt.get(); - transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq); - } + stopInstance(instanceId, instanceInfo); return; } - instanceInfo.setStatus(receivedInstanceStatus.getV()); + LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); + // 检查生命周期是否已结束 + if (lifeCycle.getEnd() != null && lifeCycle.getEnd() <= System.currentTimeMillis()) { + stopInstance(instanceId, instanceInfo); + instanceInfo.setStatus(InstanceStatus.SUCCEED.getV()); + } else { + instanceInfo.setStatus(receivedInstanceStatus.getV()); + } instanceInfo.setResult(req.getResult()); instanceInfo.setRunningTimes(req.getTotalTaskNum()); instanceInfoRepository.saveAndFlush(instanceInfo); // 任务需要告警 if (req.isNeedAlert()) { - log.info("[InstanceManager-{}] receive frequent task alert req,time:{},content:{}",instanceId,req.getReportTime(),req.getAlertContent()); + log.info("[InstanceManager-{}] receive frequent task alert req,time:{},content:{}", instanceId, req.getReportTime(), req.getAlertContent()); alert(instanceId, req.getAlertContent()); } return; @@ -166,6 +169,15 @@ public class InstanceManager { } + private void stopInstance(Long instanceId, InstanceInfoDO instanceInfo) { + Optional workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress()); + if (workerInfoOpt.isPresent()) { + ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId); + WorkerInfo workerInfo = workerInfoOpt.get(); + transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq); + } + } + /** * 收尾完成的任务实例 * diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java index 35feee2a..4ca3e327 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java @@ -148,7 +148,7 @@ public class InstanceStatusCheckService { SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus()); // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 - if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { + if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) { updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); return; } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 977210cb..39ba20a1 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -2,9 +2,9 @@ package tech.powerjob.server.core.scheduler; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.LifeCycle; import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.utils.CronExpression; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; @@ -30,7 +30,6 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.text.ParseException; import java.util.*; import java.util.stream.Collectors; @@ -69,6 +68,8 @@ public class PowerScheduleService { @Resource private JobService jobService; + @Resource + private TimingStrategyService timingStrategyService; private static final long SCHEDULE_RATE = 15000; @@ -223,7 +224,7 @@ public class PowerScheduleService { Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { try { // 查询所有的秒级任务(只包含ID) - List jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes); + List jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.FREQUENT_TYPES); if (CollectionUtils.isEmpty(jobIds)) { return; } @@ -242,10 +243,21 @@ public class PowerScheduleService { return; } - log.info("[FrequentScheduler] These frequent jobs will be scheduled: {}.", notRunningJobIds); notRunningJobIds.forEach(jobId -> { Optional jobInfoOpt = jobInfoRepository.findById(jobId); - jobInfoOpt.ifPresent(jobInfoDO -> jobService.runJob(jobInfoDO.getAppId(), jobId, null, 0L)); + jobInfoOpt.ifPresent(jobInfoDO -> { + LifeCycle lifeCycle = LifeCycle.parse(jobInfoDO.getLifecycle()); + // 生命周期已经结束 + if (lifeCycle.getEnd() != null && lifeCycle.getEnd() < System.currentTimeMillis()) { + jobInfoDO.setStatus(SwitchableStatus.DISABLE.getV()); + jobInfoDO.setGmtModified(new Date()); + jobInfoRepository.saveAndFlush(jobInfoDO); + log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId()); + } else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) { + log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId()); + jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.of(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis()); + } + }); }); } catch (Exception e) { log.error("[FrequentScheduler] schedule frequent job failed.", e); @@ -253,8 +265,9 @@ public class PowerScheduleService { }); } - private void refreshJob(JobInfoDO jobInfo) throws ParseException { - Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); + private void refreshJob(JobInfoDO jobInfo) { + LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); + Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(jobInfo.getNextTriggerTime(), TimeExpressionType.CRON, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); JobInfoDO updatedJobInfo = new JobInfoDO(); BeanUtils.copyProperties(jobInfo, updatedJobInfo); @@ -263,15 +276,16 @@ public class PowerScheduleService { log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId()); updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV()); } else { - updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); + updatedJobInfo.setNextTriggerTime(nextTriggerTime); } updatedJobInfo.setGmtModified(new Date()); jobInfoRepository.save(updatedJobInfo); } - private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException { - Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); + private void refreshWorkflow(WorkflowInfoDO wfInfo) { + LifeCycle lifeCycle = LifeCycle.parse(wfInfo.getLifecycle()); + Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(wfInfo.getNextTriggerTime(), TimeExpressionType.CRON, wfInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); WorkflowInfoDO updateEntity = new WorkflowInfoDO(); BeanUtils.copyProperties(wfInfo, updateEntity); @@ -280,26 +294,11 @@ public class PowerScheduleService { log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId()); updateEntity.setStatus(SwitchableStatus.DISABLE.getV()); } else { - updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); + updateEntity.setNextTriggerTime(nextTriggerTime); } updateEntity.setGmtModified(new Date()); workflowInfoRepository.save(updateEntity); } - /** - * 计算下次触发时间 - * - * @param preTriggerTime 前一次触发时间 - * @param cronExpression CRON 表达式 - * @return 下一次调度时间 - * @throws ParseException 异常 - */ - private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws ParseException { - - CronExpression ce = new CronExpression(cronExpression); - // 取最大值,防止长时间未调度任务被连续调度(原来DISABLE的任务突然被打开,不取最大值会补上过去所有的调度) - long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime); - return ce.getNextValidTimeAfter(new Date(benchmarkTime)); - } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/TimingStrategyService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/TimingStrategyService.java new file mode 100644 index 00000000..2b7db845 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/TimingStrategyService.java @@ -0,0 +1,123 @@ +package tech.powerjob.server.core.scheduler; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateFormatUtils; +import org.springframework.stereotype.Service; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * @author Echo009 + * @since 2022/3/21 + */ +@Slf4j +@Service +public class TimingStrategyService { + + private static final int NEXT_N_TIMES = 5; + + private static final List TIPS = Collections.singletonList("It is valid, but has not trigger time list!"); + + + private final Map strategyContainer; + + public TimingStrategyService(List timingStrategyHandlers) { + // init + strategyContainer = new EnumMap<>(TimeExpressionType.class); + for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) { + strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler); + } + } + + /** + * 计算接下来几次的调度时间 + * + * @param timeExpressionType 定时表达式类型 + * @param timeExpression 表达式 + * @param startTime 起始时间(include) + * @param endTime 结束时间(include) + * @return 调度时间列表 + */ + public List calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { + + TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType); + List triggerTimeList = new ArrayList<>(NEXT_N_TIMES); + Long nextTriggerTime = System.currentTimeMillis(); + do { + nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime); + if (nextTriggerTime == null) { + break; + } + triggerTimeList.add(nextTriggerTime); + } while (triggerTimeList.size() < NEXT_N_TIMES); + + if (triggerTimeList.isEmpty()) { + return TIPS; + } + return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList()); + } + + /** + * 计算下次的调度时间 + * + * @param preTriggerTime 上次触发时间(nullable) + * @param timeExpressionType 定时表达式类型 + * @param timeExpression 表达式 + * @param startTime 起始时间(include) + * @param endTime 结束时间(include) + * @return 下次的调度时间 + */ + public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { + if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) { + preTriggerTime = System.currentTimeMillis(); + } + return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime); + } + + + /** + * 计算下次的调度时间并检查校验规则 + * + * @param preTriggerTime 上次触发时间(nullable) + * @param timeExpressionType 定时表达式类型 + * @param timeExpression 表达式 + * @param startTime 起始时间(include) + * @param endTime 结束时间(include) + * @return 下次的调度时间 + */ + public Long calculateNextTriggerTimeWithInspection(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { + Long nextTriggerTime = calculateNextTriggerTime(preTriggerTime, timeExpressionType, timeExpression, startTime, endTime); + if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) { + throw new PowerJobException("time expression is out of date: " + timeExpression); + } + return nextTriggerTime; + } + + + public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) { + if (endTime != null) { + if (endTime <= System.currentTimeMillis()) { + throw new PowerJobException("lifecycle is out of date!"); + } + if (startTime != null && startTime > endTime) { + throw new PowerJobException("lifecycle is invalid! start time must earlier then end time."); + } + } + getHandler(timeExpressionType).validate(timeExpression); + } + + + private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) { + TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType); + if (timingStrategyHandler == null) { + throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType); + } + return timingStrategyHandler; + } + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java new file mode 100644 index 00000000..897998ce --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java @@ -0,0 +1,19 @@ +package tech.powerjob.server.core.scheduler.auxiliary; + + +/** + * @author Echo009 + * @since 2022/3/22 + */ +public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler { + @Override + public void validate(String timeExpression) { + // do nothing + } + + @Override + public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { + // do nothing + return null; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java new file mode 100644 index 00000000..e7e0df74 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java @@ -0,0 +1,37 @@ +package tech.powerjob.server.core.scheduler.auxiliary; + +import tech.powerjob.common.enums.TimeExpressionType; + +/** + * @author Echo009 + * @since 2022/2/24 + */ +public interface TimingStrategyHandler { + + /** + * 校验表达式 + * + * @param timeExpression 时间表达式 + */ + void validate(String timeExpression); + + /** + * 计算下次触发时间 + * + * @param preTriggerTime 上次触发时间 (not null) + * @param timeExpression 时间表达式 + * @param startTime 开始时间(include) + * @param endTime 结束时间(include) + * @return next trigger time + */ + Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime); + + /** + * 支持的定时策略 + * + * @return TimeExpressionType + */ + TimeExpressionType supportType(); + + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java new file mode 100644 index 00000000..70538de5 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java @@ -0,0 +1,17 @@ +package tech.powerjob.server.core.scheduler.auxiliary.impl; + +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler; + +/** + * @author Echo009 + * @since 2022/3/22 + */ +@Component +public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler { + @Override + public TimeExpressionType supportType() { + return TimeExpressionType.API; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java new file mode 100644 index 00000000..c48fd7fd --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java @@ -0,0 +1,77 @@ +package tech.powerjob.server.core.scheduler.auxiliary.impl; + +import com.cronutils.model.Cron; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.time.ExecutionTime; +import com.cronutils.parser.CronParser; +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Optional; + +/** + * @author Echo009 + * @since 2022/2/24 + */ +@Component +public class CronTimingStrategyHandler implements TimingStrategyHandler { + + private final CronParser cronParser; + + /** + * @see CronDefinitionBuilder#instanceDefinitionFor + *

+ * Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter. + * https://github.com/PowerJob/PowerJob/issues/382 + */ + public CronTimingStrategyHandler() { + CronDefinition cronDefinition = CronDefinitionBuilder.defineCron() + .withSeconds().withValidRange(0, 59).and() + .withMinutes().withValidRange(0, 59).and() + .withHours().withValidRange(0, 23).and() + .withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and() + .withMonth().withValidRange(1, 12).and() + .withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and() + .withYear().withValidRange(1970, 2099).withStrictRange().optional().and() + .instance(); + this.cronParser = new CronParser(cronDefinition); + } + + + @Override + public void validate(String timeExpression) { + cronParser.parse(timeExpression); + } + + @Override + public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { + Cron cron = cronParser.parse(timeExpression); + ExecutionTime executionTime = ExecutionTime.forCron(cron); + if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) { + // 需要计算出离 startTime 最近的一次真正的触发时间 + Optional zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault())); + preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime); + } + Instant instant = Instant.ofEpochMilli(preTriggerTime); + ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); + Optional opt = executionTime.nextExecution(preZonedDateTime); + if (opt.isPresent()) { + long nextTriggerTime = opt.get().toEpochSecond() * 1000; + if (endTime != null && endTime < nextTriggerTime) { + return null; + } + return nextTriggerTime; + } + return null; + } + + @Override + public TimeExpressionType supportType() { + return TimeExpressionType.CRON; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java new file mode 100644 index 00000000..00fb65fb --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java @@ -0,0 +1,38 @@ +package tech.powerjob.server.core.scheduler.auxiliary.impl; + +import org.springframework.stereotype.Component; +import tech.powerjob.common.PowerJobDKey; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler; + +/** + * @author Echo009 + * @since 2022/3/22 + */ +@Component +public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler { + + @Override + public void validate(String timeExpression) { + long delay; + try { + delay = Long.parseLong(timeExpression); + } catch (Exception e) { + throw new PowerJobException("invalid timeExpression!"); + } + // 默认 120s ,超过这个限制应该考虑使用其他类型以减少资源占用 + int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000")); + if (delay > maxInterval) { + throw new PowerJobException("the delay must be less than " + maxInterval + "ms"); + } + if (delay <= 0) { + throw new PowerJobException("the delay must be greater than 0 ms"); + } + } + + @Override + public TimeExpressionType supportType() { + return TimeExpressionType.FIXED_DELAY; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java new file mode 100644 index 00000000..186eb7ea --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java @@ -0,0 +1,46 @@ +package tech.powerjob.server.core.scheduler.auxiliary.impl; + +import org.springframework.stereotype.Component; +import tech.powerjob.common.PowerJobDKey; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler; + + +/** + * @author Echo009 + * @since 2022/3/22 + */ +@Component +public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler { + + @Override + public void validate(String timeExpression) { + long delay; + try { + delay = Long.parseLong(timeExpression); + } catch (Exception e) { + throw new PowerJobException("invalid timeExpression!"); + } + // 默认 120s ,超过这个限制应该使用考虑使用其他类型以减少资源占用 + int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000")); + if (delay > maxInterval) { + throw new PowerJobException("the rate must be less than " + maxInterval + "ms"); + } + if (delay <= 0) { + throw new PowerJobException("the rate must be greater than 0 ms"); + } + } + + @Override + public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) { + long r = startTime != null && startTime > preTriggerTime + ? startTime : preTriggerTime + Long.parseLong(timeExpression); + return endTime != null && endTime < r ? null : r; + } + + @Override + public TimeExpressionType supportType() { + return TimeExpressionType.FIXED_RATE; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java new file mode 100644 index 00000000..94a5e5ff --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java @@ -0,0 +1,17 @@ +package tech.powerjob.server.core.scheduler.auxiliary.impl; + +import org.springframework.stereotype.Component; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler; + +/** + * @author Echo009 + * @since 2022/3/22 + */ +@Component +public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler { + @Override + public TimeExpressionType supportType() { + return TimeExpressionType.WORKFLOW; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index 940b2ad1..732b791b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -1,30 +1,31 @@ package tech.powerjob.server.core.service; import com.alibaba.fastjson.JSON; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.PowerQuery; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.request.http.SaveJobInfoRequest; -import tech.powerjob.common.response.JobInfoDTO; -import tech.powerjob.server.common.SJ; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.utils.CronExpression; -import tech.powerjob.server.persistence.QueryConvertUtils; -import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.core.DispatchService; -import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.core.instance.InstanceService; -import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.LifeCycle; +import tech.powerjob.common.request.http.SaveJobInfoRequest; +import tech.powerjob.common.response.JobInfoDTO; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; +import tech.powerjob.server.core.DispatchService; +import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.core.scheduler.TimingStrategyService; +import tech.powerjob.server.persistence.QueryConvertUtils; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.server.redirector.DesignateServer; import javax.annotation.Resource; import java.text.ParseException; @@ -52,6 +53,8 @@ public class JobService { private JobInfoRepository jobInfoRepository; @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private TimingStrategyService timingStrategyService; /** @@ -59,9 +62,8 @@ public class JobService { * * @param request 任务请求 * @return 创建的任务ID(jobId) - * @throws ParseException 异常 */ - public Long saveJob(SaveJobInfoRequest request) throws ParseException { + public Long saveJob(SaveJobInfoRequest request) { request.valid(); @@ -89,11 +91,13 @@ public class JobService { if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds())); } - + LifeCycle lifecycle = Optional.of(request.getLifecycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE); + jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle)); + // 检查定时策略 + timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd()); calculateNextTriggerTime(jobInfoDO); if (request.getId() == null) { jobInfoDO.setGmtCreate(new Date()); - } // 检查告警配置 if (request.getAlarmConfig() != null) { @@ -224,7 +228,7 @@ public class JobService { jobInfoRepository.saveAndFlush(jobInfoDO); // 2. 关闭秒级任务 - if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) { + if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) { return; } List executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS); @@ -244,23 +248,18 @@ public class JobService { }); } - private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws ParseException { + private void calculateNextTriggerTime(JobInfoDO jobInfo) { // 计算下次调度时间 - Date now = new Date(); - TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); - - if (timeExpressionType == TimeExpressionType.CRON) { - CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); - Date nextValidTime = cronExpression.getNextValidTimeAfter(now); - if (nextValidTime == null) { - throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression()); - } - jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); - } else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { - jobInfoDO.setTimeExpression(null); + if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { + // 固定频率类型的任务不计算 + jobInfo.setTimeExpression(null); + } else { + LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); + Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(jobInfo.getNextTriggerTime(), TimeExpressionType.CRON, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); + jobInfo.setNextTriggerTime(nextValidTime); } // 重写最后修改时间 - jobInfoDO.setGmtModified(now); + jobInfo.setGmtModified(new Date()); } private void fillDefaultValue(JobInfoDO jobInfoDO) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/ValidateService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/ValidateService.java deleted file mode 100644 index b74963c4..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/ValidateService.java +++ /dev/null @@ -1,71 +0,0 @@ -package tech.powerjob.server.core.service; - -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.server.common.utils.CronExpression; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.time.DateFormatUtils; - -import java.text.ParseException; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -/** - * 校验服务 - * - * @author tjq - * @since 2020/11/28 - */ -public class ValidateService { - - private static final int NEXT_N_TIMES = 5; - - /** - * 计算指定时间表达式接下来的运行状况 - * @param timeExpressionType 时间表达式类型 - * @param timeExpression 时间表达式 - * @return 最近 N 次运行的时间 - * @throws Exception 异常 - */ - public static List calculateNextTriggerTime(TimeExpressionType timeExpressionType, String timeExpression) throws Exception { - switch (timeExpressionType) { - case API: return Lists.newArrayList(OmsConstant.NONE); - case WORKFLOW: return Lists.newArrayList("VALID: depends on workflow"); - case CRON: return calculateCronExpression(timeExpression); - case FIXED_RATE: return calculateFixRate(timeExpression); - case FIXED_DELAY: return Lists.newArrayList("VALID: depends on execution cost time"); - } - // impossible - return Collections.emptyList(); - } - - - private static List calculateFixRate(String timeExpression) { - List result = Lists.newArrayList(); - long delay = Long.parseLong(timeExpression); - for (int i = 0; i < NEXT_N_TIMES; i++) { - long nextTime = System.currentTimeMillis() + i * delay; - result.add(DateFormatUtils.format(nextTime, OmsConstant.TIME_PATTERN)); - } - return result; - } - - private static List calculateCronExpression(String expression) throws ParseException { - CronExpression cronExpression = new CronExpression(expression); - List result = Lists.newArrayList(); - Date time = new Date(); - for (int i = 0; i < NEXT_N_TIMES; i++) { - Date nextValidTime = cronExpression.getNextValidTimeAfter(time); - if (nextValidTime == null) { - break; - } - result.add(DateFormatUtils.format(nextValidTime.getTime(), OmsConstant.TIME_PATTERN)); - time = nextValidTime; - } - if (result.isEmpty()) { - result.add("INVALID: no next validate schedule time"); - } - return result; - } -} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index c9afb59e..d6ef06e8 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -10,13 +10,14 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; -import tech.powerjob.server.common.utils.CronExpression; +import tech.powerjob.server.core.scheduler.TimingStrategyService; import tech.powerjob.server.core.service.NodeValidateService; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; @@ -28,7 +29,6 @@ import tech.powerjob.server.remote.server.redirector.DesignateServer; import javax.annotation.Resource; import javax.transaction.Transactional; -import java.text.ParseException; import java.util.*; /** @@ -51,6 +51,8 @@ public class WorkflowService { private WorkflowNodeInfoRepository workflowNodeInfoRepository; @Resource private NodeValidateService nodeValidateService; + @Resource + private TimingStrategyService timingStrategyService; /** * 保存/修改工作流信息 @@ -61,7 +63,7 @@ public class WorkflowService { * @return 工作流ID */ @Transactional(rollbackOn = Exception.class) - public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException { + public Long saveWorkflow(SaveWorkflowRequest req) { req.valid(); @@ -83,14 +85,16 @@ public class WorkflowService { if (req.getNotifyUserIds() != null) { wf.setNotifyUserIds(SJ.COMMA_JOINER.join(req.getNotifyUserIds())); } - - // 计算 NextTriggerTime - if (req.getTimeExpressionType() == TimeExpressionType.CRON) { - CronExpression cronExpression = new CronExpression(req.getTimeExpression()); - Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); - wf.setNextTriggerTime(nextValidTime.getTime()); - } else { + if (req.getLifeCycle() != null) { + wf.setLifecycle(JSON.toJSONString(req.getLifeCycle())); + } + if (TimeExpressionType.FREQUENT_TYPES.contains(req.getTimeExpressionType().getV())){ + // 固定频率类型的任务不计算 wf.setTimeExpression(null); + }else { + LifeCycle lifeCycle = Optional.of(req.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE); + Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(wf.getNextTriggerTime(), TimeExpressionType.CRON, wf.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); + wf.setNextTriggerTime(nextValidTime); } // 新增工作流,需要先 save 一下获取 ID if (wfId == null) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java index e8cacdf3..4b0e2c2a 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java @@ -1,30 +1,41 @@ package tech.powerjob.server.web.controller; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.response.ResultDTO; -import tech.powerjob.server.core.service.ValidateService; import com.google.common.collect.Lists; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.response.ResultDTO; +import tech.powerjob.server.core.scheduler.TimingStrategyService; +import javax.annotation.Resource; import java.util.List; /** * 校验控制器 * * @author tjq + * @author Echo009 * @since 2020/11/28 */ @RestController @RequestMapping("/validate") public class ValidateController { + @Resource + private TimingStrategyService timingStrategyService; + @GetMapping("/timeExpression") - public ResultDTO> checkTimeExpression(TimeExpressionType timeExpressionType, String timeExpression) { + public ResultDTO> checkTimeExpression(TimeExpressionType timeExpressionType, + String timeExpression, + @RequestParam(required = false) Long startTime, + @RequestParam(required = false) Long endTime + ) { try { - return ResultDTO.success(ValidateService.calculateNextTriggerTime(timeExpressionType, timeExpression)); + timingStrategyService.validate(timeExpressionType, timeExpression, startTime, endTime); + return ResultDTO.success(timingStrategyService.calculateNextTriggerTimes(timeExpressionType, timeExpression, startTime, endTime)); } catch (Exception e) { return ResultDTO.success(Lists.newArrayList(ExceptionUtils.getMessage(e))); } diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/CronExpression.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/CronExpression.java similarity index 99% rename from powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/CronExpression.java rename to powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/CronExpression.java index 43260549..fdf46960 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/CronExpression.java +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/CronExpression.java @@ -1,4 +1,4 @@ -package tech.powerjob.server.common.utils; +package tech.powerjob.server.core.scheduler; /* Copyright [2020] [PowerJob] diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/CronTimingStrategyHandlerTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/CronTimingStrategyHandlerTest.java new file mode 100644 index 00000000..084acee3 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/CronTimingStrategyHandlerTest.java @@ -0,0 +1,93 @@ +package tech.powerjob.server.core.scheduler; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateFormatUtils; +import org.assertj.core.util.Lists; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.server.core.scheduler.auxiliary.impl.CronTimingStrategyHandler; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.List; + +/** + * @author Echo009 + * @since 2022/3/29 + */ +@Slf4j +public class CronTimingStrategyHandlerTest { + + private final CronTimingStrategyHandler cronTimingStrategyHandler = new CronTimingStrategyHandler(); + + private static final List CRON_LIST = Lists.list( + "0 0 10,11,12 * * ?", + "0 0/30 9-17 * * ?", + "0 0 12 ? * WED", + "0 0 12 * * ?", + "0 15 11 ? * *", + "0 15 11 * * ?", + "0 15 11 * * ? *", + "0 15 11 * * ? 2088", + "0 * 14 * * ?", + "0 0/5 14 * * ?", + "0 0/5 14,18 * * ?", + "0 0-5 14 * * ?", + "0 10,44 14 ? 3 WED", + "0 15 11 ? * MON-FRI", + "0 15 11 15 * ?", + "0 15 11 L * ?", + "0 15 11 ? * 6L", + "0 15 11 ? * 6L 2087-2088", + "0 15 11 ? * 6L 2087-2088", + "0 15 11 ? * 6#3", + "0 0 0 1 1 ? 2088" + ); + + + @SneakyThrows + @Test + public void compareToQuartzCron() { + // 对比 quartz cron 结果 + for (String cron : CRON_LIST) { + Long referenceTime = System.currentTimeMillis(); + int count = 0; + while (referenceTime != null && count < 50) { + CronExpression cronExpression = new CronExpression(cron); + Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(new Date(referenceTime)); + Long quartzRes = nextValidTimeAfter == null ? null : nextValidTimeAfter.getTime(); + Long newRes = cronTimingStrategyHandler.calculateNextTriggerTime(referenceTime, cron, null, null); + log.info("cron:'{}',reference time:{},quartz result:{},new result:{}", cron, referenceTime, quartzRes, newRes); + referenceTime = newRes; + count++; + Assertions.assertEquals(newRes, quartzRes); + } + } + } + + @Test + public void test01() { + // cron 的有效区间小于 lifecycle + String cron = "0 15 11 * * ? 2088-2089"; + Long referenceTime = System.currentTimeMillis(); + LocalDateTime start = LocalDateTime.of(2088, 5, 1, 11, 15, 0); + LocalDateTime end = LocalDateTime.of(2099, 5, 1, 11, 15, 0); + Long nextTriggerTime = cronTimingStrategyHandler.calculateNextTriggerTime(referenceTime, cron, start.toEpochSecond(ZoneOffset.of("+8")) * 1000, end.toEpochSecond(ZoneOffset.of("+8")) * 1000); + Assertions.assertEquals("2088-05-01 11:15:00",DateFormatUtils.format(nextTriggerTime, OmsConstant.TIME_PATTERN)); + } + + + @Test + public void test02() { + // cron 的有效区间大于 lifecycle + String cron = "0 15 11 * * ? 2077-2099"; + Long referenceTime = System.currentTimeMillis(); + LocalDateTime start = LocalDateTime.of(2088, 5, 1, 11, 15, 0); + LocalDateTime end = LocalDateTime.of(2099, 5, 1, 11, 15, 0); + Long nextTriggerTime = cronTimingStrategyHandler.calculateNextTriggerTime(referenceTime, cron, start.toEpochSecond(ZoneOffset.of("+8")) * 1000, end.toEpochSecond(ZoneOffset.of("+8")) * 1000); + Assertions.assertEquals("2088-05-01 11:15:00",DateFormatUtils.format(nextTriggerTime, OmsConstant.TIME_PATTERN)); + } +} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/TimingStrategyServiceTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/TimingStrategyServiceTest.java new file mode 100644 index 00000000..0e15da26 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/core/scheduler/TimingStrategyServiceTest.java @@ -0,0 +1,91 @@ +package tech.powerjob.server.core.scheduler; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler; +import tech.powerjob.server.core.scheduler.auxiliary.impl.*; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Echo009 + * @since 2022/3/29 + */ +public class TimingStrategyServiceTest { + + private final TimingStrategyService timingStrategyService; + + public TimingStrategyServiceTest() { + List timingStrategyHandlers = new ArrayList<>(); + timingStrategyHandlers.add(new CronTimingStrategyHandler()); + timingStrategyHandlers.add(new ApiTimingStrategyHandler()); + timingStrategyHandlers.add(new FixedDelayTimingStrategyHandler()); + timingStrategyHandlers.add(new FixedRateTimingStrategyHandler()); + timingStrategyHandlers.add(new WorkflowTimingStrategyHandler()); + timingStrategyService = new TimingStrategyService(timingStrategyHandlers); + } + + + @Test + public void testApiAndWorkflow() { + // api + Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.API, "", null, null)); + List triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.API, "", null, null); + Assert.assertEquals(1, triggerTimes.size()); + // workflow + Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.WORKFLOW, "", null, null)); + triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.WORKFLOW, "", null, null); + Assert.assertEquals(1, triggerTimes.size()); + } + + @Test + public void testFixedRate() { + // fixed rate + Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "-0", null, null)); + Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "FFF", null, null)); + Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "300000", null, null)); + Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "10000", null, null)); + + long timeParam = 1000; + List triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.FIXED_RATE, String.valueOf(timeParam), null, null); + Assert.assertEquals(5, triggerTimes.size()); + + Long startTime = System.currentTimeMillis() + timeParam; + Long endTime = System.currentTimeMillis() + timeParam * 3; + triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.FIXED_RATE, String.valueOf(timeParam), startTime, endTime); + Assert.assertEquals(3, triggerTimes.size()); + + } + + @Test + public void testFixedDelay() { + // fixed delay + Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "-0", null, null)); + Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "FFF", null, null)); + Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "300000", null, null)); + Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "10000", null, null)); + + List triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.FIXED_DELAY, "1", null, null); + Assert.assertEquals(1, triggerTimes.size()); + } + + + @Test + public void testCron() { + Assertions.assertThrows(IllegalArgumentException.class, () -> timingStrategyService.validate(TimeExpressionType.CRON, "00 00 07 8-14,22-28 * 8", null, null)); + Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.CRON, "00 00 07 8-14,22-28 * 2", null, null)); + // https://github.com/PowerJob/PowerJob/issues/382 + // 支持同时指定 day-of-week 、day-of-month + // 每隔一周的周一早上 7 点执行一次 + LocalDateTime start = LocalDateTime.of(2088, 5, 24, 7, 0, 0); + LocalDateTime end = LocalDateTime.of(2088, 7, 12, 7, 0, 0); + List triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.CRON, "0 0 7 8-14,22-28 * 2", start.toEpochSecond(ZoneOffset.of("+8")) * 1000, end.toEpochSecond(ZoneOffset.of("+8")) * 1000); + Assert.assertNotNull(triggerTimes); + } +} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/CronTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/CronTest.java deleted file mode 100644 index f972eeab..00000000 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/CronTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package tech.powerjob.server.test; - - -import tech.powerjob.server.common.utils.CronExpression; -import org.junit.Test; - -import java.util.Date; - -/** - * CRON 测试 - * - * @author tjq - * @since 2020/10/8 - */ -public class CronTest { - - private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020"; - - @Test - public void testFixedTimeCron() throws Exception { - CronExpression cronExpression = new CronExpression(FIXED_CRON); - System.out.println(cronExpression.getCronExpression()); - System.out.println(cronExpression.getNextValidTimeAfter(new Date())); - } - -} diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/UtilsTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/UtilsTest.java index 81e12aad..de928c76 100644 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/UtilsTest.java +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/UtilsTest.java @@ -1,11 +1,9 @@ package tech.powerjob.server.test; -import tech.powerjob.server.common.utils.CronExpression; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.junit.Test; -import java.util.Date; import java.util.List; import java.util.Objects; import java.util.TimeZone; @@ -19,14 +17,6 @@ import java.util.stream.Collectors; */ public class UtilsTest { - @Test - public void testCronExpression() throws Exception { - String cron = "0 * * * * ? *"; - CronExpression cronExpression = new CronExpression(cron); - final Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(new Date()); - System.out.println(nextValidTimeAfter); - } - @Test public void normalTest() { String s = "000000000111010000001100000000010110100110100000000001000000000000"; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 902dc726..2526c96f 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -270,7 +270,7 @@ public class ProcessorTracker { // 超时检查,如果超时则自动关闭 TaskTracker long interval = System.currentTimeMillis() - startTime; // 秒级任务的ProcessorTracker不应该关闭 - if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { + if (!TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) { if (interval > instanceInfo.getInstanceTimeoutMS()) { log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId); destroy(); @@ -385,7 +385,7 @@ public class ProcessorTracker { if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { return instanceInfo.getThreadConcurrency(); } - if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { + if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) { return instanceInfo.getThreadConcurrency(); } return 2;