feat: replace cron implementation and support job lifecycle #382 #208

This commit is contained in:
Echo009 2022-02-25 18:01:14 +08:00
parent abf266b7f8
commit 8909584976
29 changed files with 726 additions and 203 deletions

View File

@ -24,5 +24,9 @@ public class PowerJobDKey {
public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored";
public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period";
/**
* ms
*/
public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval";
}

View File

@ -3,6 +3,7 @@ package tech.powerjob.common.enums;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.List;
@ -14,6 +15,7 @@ import java.util.List;
*/
@Getter
@AllArgsConstructor
@ToString
public enum TimeExpressionType {
API(1),
@ -24,7 +26,11 @@ public enum TimeExpressionType {
int v;
public static final List<Integer> frequentTypes = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v);
public static final List<Integer> FREQUENT_TYPES = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v);
/**
* 首次计算触发时间时必须计算出一个有效值
*/
public static final List<Integer> INSPECT_TYPES = Lists.newArrayList(CRON.v);
public static TimeExpressionType of(int v) {
for (TimeExpressionType type : values()) {

View File

@ -0,0 +1,28 @@
package tech.powerjob.common.model;
import lombok.Data;
import tech.powerjob.common.serialize.JsonUtils;
/**
* @author Echo009
* @since 2022/3/22
*/
@Data
public class LifeCycle {
public static final LifeCycle EMPTY_LIFE_CYCLE = new LifeCycle();
private Long start;
private Long end;
public static LifeCycle parse(String lifeCycle){
try {
return JsonUtils.parseObject(lifeCycle,LifeCycle.class);
}catch (Exception e){
// ignore
return EMPTY_LIFE_CYCLE;
}
}
}

View File

@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
import lombok.Data;
import tech.powerjob.common.response.JobInfoDTO;
@ -132,7 +133,7 @@ public class SaveJobInfoRequest {
private DispatchStrategy dispatchStrategy;
private String lifecycle;
private LifeCycle lifecycle;
/**
* alarm config
*/

View File

@ -1,6 +1,7 @@
package tech.powerjob.common.request.http;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
@ -63,6 +64,8 @@ public class SaveWorkflowRequest implements Serializable {
/** 点线表示法*/
private PEWorkflowDAG dag;
private LifeCycle lifeCycle;
public void valid() {
CommonUtils.requireNonNull(wfName, "workflow name can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");

View File

@ -248,6 +248,11 @@
<version>${vertx-web.version}</version>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.1.6</version>
</dependency>
<!-- swagger2 -->
<dependency>

View File

@ -111,7 +111,7 @@ public class DispatchService {
long current = System.currentTimeMillis();
Integer maxInstanceNum = jobInfo.getMaxInstanceNum();
// 秒级任务只派发到一台机器具体的 maxInstanceNum TaskTracker 控制
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
maxInstanceNum = 1;
}

View File

@ -354,7 +354,7 @@ public class InstanceLogService {
instanceId2LastReportTime.keySet().forEach(instanceId -> {
try {
JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
frequentInstanceIds.add(instanceId);
}
}catch (Exception ignore) {

View File

@ -7,6 +7,7 @@ import org.springframework.util.StringUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.server.common.module.WorkerInfo;
@ -96,26 +97,28 @@ public class InstanceManager {
// FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可
// FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行
// 综上直接把 status runningNum 同步到DB即可
if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
if (TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType)) {
// 如果实例处于失败状态则说明该 worker 失联了一段时间 server 判定为宕机而此时该秒级任务有可能已经重新派发了故需要 Kill 掉该实例
// fix issue 375
if (instanceInfo.getStatus() == InstanceStatus.FAILED.getV()) {
log.warn("[InstanceManager-{}] receive TaskTracker's report: {}, but current instance is already failed, this instance should be killed.", instanceId, req);
Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()){
ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq);
}
stopInstance(instanceId, instanceInfo);
return;
}
instanceInfo.setStatus(receivedInstanceStatus.getV());
LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
// 检查生命周期是否已结束
if (lifeCycle.getEnd() != null && lifeCycle.getEnd() <= System.currentTimeMillis()) {
stopInstance(instanceId, instanceInfo);
instanceInfo.setStatus(InstanceStatus.SUCCEED.getV());
} else {
instanceInfo.setStatus(receivedInstanceStatus.getV());
}
instanceInfo.setResult(req.getResult());
instanceInfo.setRunningTimes(req.getTotalTaskNum());
instanceInfoRepository.saveAndFlush(instanceInfo);
// 任务需要告警
if (req.isNeedAlert()) {
log.info("[InstanceManager-{}] receive frequent task alert req,time:{},content:{}",instanceId,req.getReportTime(),req.getAlertContent());
log.info("[InstanceManager-{}] receive frequent task alert req,time:{},content:{}", instanceId, req.getReportTime(), req.getAlertContent());
alert(instanceId, req.getAlertContent());
}
return;
@ -166,6 +169,15 @@ public class InstanceManager {
}
private void stopInstance(Long instanceId, InstanceInfoDO instanceInfo) {
Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq);
}
}
/**
* 收尾完成的任务实例
*

View File

@ -148,7 +148,7 @@ public class InstanceStatusCheckService {
SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus());
// 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度
if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) {
if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) {
updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT);
return;
}

View File

@ -2,9 +2,9 @@ package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
@ -30,7 +30,6 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.*;
import java.util.stream.Collectors;
@ -69,6 +68,8 @@ public class PowerScheduleService {
@Resource
private JobService jobService;
@Resource
private TimingStrategyService timingStrategyService;
private static final long SCHEDULE_RATE = 15000;
@ -223,7 +224,7 @@ public class PowerScheduleService {
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
try {
// 查询所有的秒级任务只包含ID
List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes);
List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.FREQUENT_TYPES);
if (CollectionUtils.isEmpty(jobIds)) {
return;
}
@ -242,10 +243,21 @@ public class PowerScheduleService {
return;
}
log.info("[FrequentScheduler] These frequent jobs will be scheduled {}.", notRunningJobIds);
notRunningJobIds.forEach(jobId -> {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
jobInfoOpt.ifPresent(jobInfoDO -> jobService.runJob(jobInfoDO.getAppId(), jobId, null, 0L));
jobInfoOpt.ifPresent(jobInfoDO -> {
LifeCycle lifeCycle = LifeCycle.parse(jobInfoDO.getLifecycle());
// 生命周期已经结束
if (lifeCycle.getEnd() != null && lifeCycle.getEnd() < System.currentTimeMillis()) {
jobInfoDO.setStatus(SwitchableStatus.DISABLE.getV());
jobInfoDO.setGmtModified(new Date());
jobInfoRepository.saveAndFlush(jobInfoDO);
log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId());
} else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) {
log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId());
jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.of(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis());
}
});
});
} catch (Exception e) {
log.error("[FrequentScheduler] schedule frequent job failed.", e);
@ -253,8 +265,9 @@ public class PowerScheduleService {
});
}
private void refreshJob(JobInfoDO jobInfo) throws ParseException {
Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression());
private void refreshJob(JobInfoDO jobInfo) {
LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(jobInfo.getNextTriggerTime(), TimeExpressionType.CRON, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfo, updatedJobInfo);
@ -263,15 +276,16 @@ public class PowerScheduleService {
log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId());
updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV());
} else {
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
updatedJobInfo.setNextTriggerTime(nextTriggerTime);
}
updatedJobInfo.setGmtModified(new Date());
jobInfoRepository.save(updatedJobInfo);
}
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
private void refreshWorkflow(WorkflowInfoDO wfInfo) {
LifeCycle lifeCycle = LifeCycle.parse(wfInfo.getLifecycle());
Long nextTriggerTime = timingStrategyService.calculateNextTriggerTime(wfInfo.getNextTriggerTime(), TimeExpressionType.CRON, wfInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
@ -280,26 +294,11 @@ public class PowerScheduleService {
log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId());
updateEntity.setStatus(SwitchableStatus.DISABLE.getV());
} else {
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
updateEntity.setNextTriggerTime(nextTriggerTime);
}
updateEntity.setGmtModified(new Date());
workflowInfoRepository.save(updateEntity);
}
/**
* 计算下次触发时间
*
* @param preTriggerTime 前一次触发时间
* @param cronExpression CRON 表达式
* @return 下一次调度时间
* @throws ParseException 异常
*/
private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws ParseException {
CronExpression ce = new CronExpression(cronExpression);
// 取最大值防止长时间未调度任务被连续调度原来DISABLE的任务突然被打开不取最大值会补上过去所有的调度
long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime);
return ce.getNextValidTimeAfter(new Date(benchmarkTime));
}
}

View File

@ -0,0 +1,123 @@
package tech.powerjob.server.core.scheduler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author Echo009
* @since 2022/3/21
*/
@Slf4j
@Service
public class TimingStrategyService {
private static final int NEXT_N_TIMES = 5;
private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!");
private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer;
public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) {
// init
strategyContainer = new EnumMap<>(TimeExpressionType.class);
for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) {
strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler);
}
}
/**
* 计算接下来几次的调度时间
*
* @param timeExpressionType 定时表达式类型
* @param timeExpression 表达式
* @param startTime 起始时间(include)
* @param endTime 结束时间(include)
* @return 调度时间列表
*/
public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType);
List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES);
Long nextTriggerTime = System.currentTimeMillis();
do {
nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime);
if (nextTriggerTime == null) {
break;
}
triggerTimeList.add(nextTriggerTime);
} while (triggerTimeList.size() < NEXT_N_TIMES);
if (triggerTimeList.isEmpty()) {
return TIPS;
}
return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList());
}
/**
* 计算下次的调度时间
*
* @param preTriggerTime 上次触发时间(nullable)
* @param timeExpressionType 定时表达式类型
* @param timeExpression 表达式
* @param startTime 起始时间(include)
* @param endTime 结束时间(include)
* @return 下次的调度时间
*/
public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) {
preTriggerTime = System.currentTimeMillis();
}
return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime);
}
/**
* 计算下次的调度时间并检查校验规则
*
* @param preTriggerTime 上次触发时间(nullable)
* @param timeExpressionType 定时表达式类型
* @param timeExpression 表达式
* @param startTime 起始时间(include)
* @param endTime 结束时间(include)
* @return 下次的调度时间
*/
public Long calculateNextTriggerTimeWithInspection(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
Long nextTriggerTime = calculateNextTriggerTime(preTriggerTime, timeExpressionType, timeExpression, startTime, endTime);
if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) {
throw new PowerJobException("time expression is out of date: " + timeExpression);
}
return nextTriggerTime;
}
public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
if (endTime != null) {
if (endTime <= System.currentTimeMillis()) {
throw new PowerJobException("lifecycle is out of date!");
}
if (startTime != null && startTime > endTime) {
throw new PowerJobException("lifecycle is invalid! start time must earlier then end time.");
}
}
getHandler(timeExpressionType).validate(timeExpression);
}
private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) {
TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType);
if (timingStrategyHandler == null) {
throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType);
}
return timingStrategyHandler;
}
}

View File

@ -0,0 +1,19 @@
package tech.powerjob.server.core.scheduler.auxiliary;
/**
* @author Echo009
* @since 2022/3/22
*/
public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler {
@Override
public void validate(String timeExpression) {
// do nothing
}
@Override
public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
// do nothing
return null;
}
}

View File

@ -0,0 +1,37 @@
package tech.powerjob.server.core.scheduler.auxiliary;
import tech.powerjob.common.enums.TimeExpressionType;
/**
* @author Echo009
* @since 2022/2/24
*/
public interface TimingStrategyHandler {
/**
* 校验表达式
*
* @param timeExpression 时间表达式
*/
void validate(String timeExpression);
/**
* 计算下次触发时间
*
* @param preTriggerTime 上次触发时间 (not null)
* @param timeExpression 时间表达式
* @param startTime 开始时间(include)
* @param endTime 结束时间(include)
* @return next trigger time
*/
Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime);
/**
* 支持的定时策略
*
* @return TimeExpressionType
*/
TimeExpressionType supportType();
}

View File

@ -0,0 +1,17 @@
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.API;
}
}

View File

@ -0,0 +1,77 @@
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Optional;
/**
* @author Echo009
* @since 2022/2/24
*/
@Component
public class CronTimingStrategyHandler implements TimingStrategyHandler {
private final CronParser cronParser;
/**
* @see CronDefinitionBuilder#instanceDefinitionFor
* <p>
* Enhanced quartz cronSupport for specifying both a day-of-week and a day-of-month parameter.
* https://github.com/PowerJob/PowerJob/issues/382
*/
public CronTimingStrategyHandler() {
CronDefinition cronDefinition = CronDefinitionBuilder.defineCron()
.withSeconds().withValidRange(0, 59).and()
.withMinutes().withValidRange(0, 59).and()
.withHours().withValidRange(0, 23).and()
.withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and()
.withMonth().withValidRange(1, 12).and()
.withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and()
.withYear().withValidRange(1970, 2099).withStrictRange().optional().and()
.instance();
this.cronParser = new CronParser(cronDefinition);
}
@Override
public void validate(String timeExpression) {
cronParser.parse(timeExpression);
}
@Override
public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
Cron cron = cronParser.parse(timeExpression);
ExecutionTime executionTime = ExecutionTime.forCron(cron);
if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {
// 需要计算出离 startTime 最近的一次真正的触发时间
Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()));
preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime);
}
Instant instant = Instant.ofEpochMilli(preTriggerTime);
ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime);
if (opt.isPresent()) {
long nextTriggerTime = opt.get().toEpochSecond() * 1000;
if (endTime != null && endTime < nextTriggerTime) {
return null;
}
return nextTriggerTime;
}
return null;
}
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.CRON;
}
}

View File

@ -0,0 +1,38 @@
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public void validate(String timeExpression) {
long delay;
try {
delay = Long.parseLong(timeExpression);
} catch (Exception e) {
throw new PowerJobException("invalid timeExpression!");
}
// 默认 120s 超过这个限制应该考虑使用其他类型以减少资源占用
int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
if (delay > maxInterval) {
throw new PowerJobException("the delay must be less than " + maxInterval + "ms");
}
if (delay <= 0) {
throw new PowerJobException("the delay must be greater than 0 ms");
}
}
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.FIXED_DELAY;
}
}

View File

@ -0,0 +1,46 @@
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public void validate(String timeExpression) {
long delay;
try {
delay = Long.parseLong(timeExpression);
} catch (Exception e) {
throw new PowerJobException("invalid timeExpression!");
}
// 默认 120s 超过这个限制应该使用考虑使用其他类型以减少资源占用
int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
if (delay > maxInterval) {
throw new PowerJobException("the rate must be less than " + maxInterval + "ms");
}
if (delay <= 0) {
throw new PowerJobException("the rate must be greater than 0 ms");
}
}
@Override
public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
long r = startTime != null && startTime > preTriggerTime
? startTime : preTriggerTime + Long.parseLong(timeExpression);
return endTime != null && endTime < r ? null : r;
}
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.FIXED_RATE;
}
}

View File

@ -0,0 +1,17 @@
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.WORKFLOW;
}
}

View File

@ -1,30 +1,31 @@
package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.scheduler.TimingStrategyService;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import javax.annotation.Resource;
import java.text.ParseException;
@ -52,6 +53,8 @@ public class JobService {
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private TimingStrategyService timingStrategyService;
/**
@ -59,9 +62,8 @@ public class JobService {
*
* @param request 任务请求
* @return 创建的任务IDjobId
* @throws ParseException 异常
*/
public Long saveJob(SaveJobInfoRequest request) throws ParseException {
public Long saveJob(SaveJobInfoRequest request) {
request.valid();
@ -89,11 +91,13 @@ public class JobService {
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds()));
}
LifeCycle lifecycle = Optional.of(request.getLifecycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle));
// 检查定时策略
timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd());
calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date());
}
// 检查告警配置
if (request.getAlarmConfig() != null) {
@ -224,7 +228,7 @@ public class JobService {
jobInfoRepository.saveAndFlush(jobInfoDO);
// 2. 关闭秒级任务
if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) {
if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) {
return;
}
List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS);
@ -244,23 +248,18 @@ public class JobService {
});
}
private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws ParseException {
private void calculateNextTriggerTime(JobInfoDO jobInfo) {
// 计算下次调度时间
Date now = new Date();
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
if (nextValidTime == null) {
throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression());
}
jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
} else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) {
jobInfoDO.setTimeExpression(null);
if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
// 固定频率类型的任务不计算
jobInfo.setTimeExpression(null);
} else {
LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(jobInfo.getNextTriggerTime(), TimeExpressionType.CRON, jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
jobInfo.setNextTriggerTime(nextValidTime);
}
// 重写最后修改时间
jobInfoDO.setGmtModified(now);
jobInfo.setGmtModified(new Date());
}
private void fillDefaultValue(JobInfoDO jobInfoDO) {

View File

@ -1,71 +0,0 @@
package tech.powerjob.server.core.service;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.common.utils.CronExpression;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/**
* 校验服务
*
* @author tjq
* @since 2020/11/28
*/
public class ValidateService {
private static final int NEXT_N_TIMES = 5;
/**
* 计算指定时间表达式接下来的运行状况
* @param timeExpressionType 时间表达式类型
* @param timeExpression 时间表达式
* @return 最近 N 次运行的时间
* @throws Exception 异常
*/
public static List<String> calculateNextTriggerTime(TimeExpressionType timeExpressionType, String timeExpression) throws Exception {
switch (timeExpressionType) {
case API: return Lists.newArrayList(OmsConstant.NONE);
case WORKFLOW: return Lists.newArrayList("VALID: depends on workflow");
case CRON: return calculateCronExpression(timeExpression);
case FIXED_RATE: return calculateFixRate(timeExpression);
case FIXED_DELAY: return Lists.newArrayList("VALID: depends on execution cost time");
}
// impossible
return Collections.emptyList();
}
private static List<String> calculateFixRate(String timeExpression) {
List<String> result = Lists.newArrayList();
long delay = Long.parseLong(timeExpression);
for (int i = 0; i < NEXT_N_TIMES; i++) {
long nextTime = System.currentTimeMillis() + i * delay;
result.add(DateFormatUtils.format(nextTime, OmsConstant.TIME_PATTERN));
}
return result;
}
private static List<String> calculateCronExpression(String expression) throws ParseException {
CronExpression cronExpression = new CronExpression(expression);
List<String> result = Lists.newArrayList();
Date time = new Date();
for (int i = 0; i < NEXT_N_TIMES; i++) {
Date nextValidTime = cronExpression.getNextValidTimeAfter(time);
if (nextValidTime == null) {
break;
}
result.add(DateFormatUtils.format(nextValidTime.getTime(), OmsConstant.TIME_PATTERN));
time = nextValidTime;
}
if (result.isEmpty()) {
result.add("INVALID: no next validate schedule time");
}
return result;
}
}

View File

@ -10,13 +10,14 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.core.scheduler.TimingStrategyService;
import tech.powerjob.server.core.service.NodeValidateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
@ -28,7 +29,6 @@ import tech.powerjob.server.remote.server.redirector.DesignateServer;
import javax.annotation.Resource;
import javax.transaction.Transactional;
import java.text.ParseException;
import java.util.*;
/**
@ -51,6 +51,8 @@ public class WorkflowService {
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Resource
private NodeValidateService nodeValidateService;
@Resource
private TimingStrategyService timingStrategyService;
/**
* 保存/修改工作流信息
@ -61,7 +63,7 @@ public class WorkflowService {
* @return 工作流ID
*/
@Transactional(rollbackOn = Exception.class)
public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException {
public Long saveWorkflow(SaveWorkflowRequest req) {
req.valid();
@ -83,14 +85,16 @@ public class WorkflowService {
if (req.getNotifyUserIds() != null) {
wf.setNotifyUserIds(SJ.COMMA_JOINER.join(req.getNotifyUserIds()));
}
// 计算 NextTriggerTime
if (req.getTimeExpressionType() == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(req.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date());
wf.setNextTriggerTime(nextValidTime.getTime());
} else {
if (req.getLifeCycle() != null) {
wf.setLifecycle(JSON.toJSONString(req.getLifeCycle()));
}
if (TimeExpressionType.FREQUENT_TYPES.contains(req.getTimeExpressionType().getV())){
// 固定频率类型的任务不计算
wf.setTimeExpression(null);
}else {
LifeCycle lifeCycle = Optional.of(req.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(wf.getNextTriggerTime(), TimeExpressionType.CRON, wf.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
wf.setNextTriggerTime(nextValidTime);
}
// 新增工作流需要先 save 一下获取 ID
if (wfId == null) {

View File

@ -1,30 +1,41 @@
package tech.powerjob.server.web.controller;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.core.service.ValidateService;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.core.scheduler.TimingStrategyService;
import javax.annotation.Resource;
import java.util.List;
/**
* 校验控制器
*
* @author tjq
* @author Echo009
* @since 2020/11/28
*/
@RestController
@RequestMapping("/validate")
public class ValidateController {
@Resource
private TimingStrategyService timingStrategyService;
@GetMapping("/timeExpression")
public ResultDTO<List<String>> checkTimeExpression(TimeExpressionType timeExpressionType, String timeExpression) {
public ResultDTO<List<String>> checkTimeExpression(TimeExpressionType timeExpressionType,
String timeExpression,
@RequestParam(required = false) Long startTime,
@RequestParam(required = false) Long endTime
) {
try {
return ResultDTO.success(ValidateService.calculateNextTriggerTime(timeExpressionType, timeExpression));
timingStrategyService.validate(timeExpressionType, timeExpression, startTime, endTime);
return ResultDTO.success(timingStrategyService.calculateNextTriggerTimes(timeExpressionType, timeExpression, startTime, endTime));
} catch (Exception e) {
return ResultDTO.success(Lists.newArrayList(ExceptionUtils.getMessage(e)));
}

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.common.utils;
package tech.powerjob.server.core.scheduler;
/*
Copyright [2020] [PowerJob]

View File

@ -0,0 +1,93 @@
package tech.powerjob.server.core.scheduler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.server.core.scheduler.auxiliary.impl.CronTimingStrategyHandler;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
/**
* @author Echo009
* @since 2022/3/29
*/
@Slf4j
public class CronTimingStrategyHandlerTest {
private final CronTimingStrategyHandler cronTimingStrategyHandler = new CronTimingStrategyHandler();
private static final List<String> CRON_LIST = Lists.list(
"0 0 10,11,12 * * ?",
"0 0/30 9-17 * * ?",
"0 0 12 ? * WED",
"0 0 12 * * ?",
"0 15 11 ? * *",
"0 15 11 * * ?",
"0 15 11 * * ? *",
"0 15 11 * * ? 2088",
"0 * 14 * * ?",
"0 0/5 14 * * ?",
"0 0/5 14,18 * * ?",
"0 0-5 14 * * ?",
"0 10,44 14 ? 3 WED",
"0 15 11 ? * MON-FRI",
"0 15 11 15 * ?",
"0 15 11 L * ?",
"0 15 11 ? * 6L",
"0 15 11 ? * 6L 2087-2088",
"0 15 11 ? * 6L 2087-2088",
"0 15 11 ? * 6#3",
"0 0 0 1 1 ? 2088"
);
@SneakyThrows
@Test
public void compareToQuartzCron() {
// 对比 quartz cron 结果
for (String cron : CRON_LIST) {
Long referenceTime = System.currentTimeMillis();
int count = 0;
while (referenceTime != null && count < 50) {
CronExpression cronExpression = new CronExpression(cron);
Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(new Date(referenceTime));
Long quartzRes = nextValidTimeAfter == null ? null : nextValidTimeAfter.getTime();
Long newRes = cronTimingStrategyHandler.calculateNextTriggerTime(referenceTime, cron, null, null);
log.info("cron:'{}',reference time:{},quartz result:{},new result:{}", cron, referenceTime, quartzRes, newRes);
referenceTime = newRes;
count++;
Assertions.assertEquals(newRes, quartzRes);
}
}
}
@Test
public void test01() {
// cron 的有效区间小于 lifecycle
String cron = "0 15 11 * * ? 2088-2089";
Long referenceTime = System.currentTimeMillis();
LocalDateTime start = LocalDateTime.of(2088, 5, 1, 11, 15, 0);
LocalDateTime end = LocalDateTime.of(2099, 5, 1, 11, 15, 0);
Long nextTriggerTime = cronTimingStrategyHandler.calculateNextTriggerTime(referenceTime, cron, start.toEpochSecond(ZoneOffset.of("+8")) * 1000, end.toEpochSecond(ZoneOffset.of("+8")) * 1000);
Assertions.assertEquals("2088-05-01 11:15:00",DateFormatUtils.format(nextTriggerTime, OmsConstant.TIME_PATTERN));
}
@Test
public void test02() {
// cron 的有效区间大于 lifecycle
String cron = "0 15 11 * * ? 2077-2099";
Long referenceTime = System.currentTimeMillis();
LocalDateTime start = LocalDateTime.of(2088, 5, 1, 11, 15, 0);
LocalDateTime end = LocalDateTime.of(2099, 5, 1, 11, 15, 0);
Long nextTriggerTime = cronTimingStrategyHandler.calculateNextTriggerTime(referenceTime, cron, start.toEpochSecond(ZoneOffset.of("+8")) * 1000, end.toEpochSecond(ZoneOffset.of("+8")) * 1000);
Assertions.assertEquals("2088-05-01 11:15:00",DateFormatUtils.format(nextTriggerTime, OmsConstant.TIME_PATTERN));
}
}

View File

@ -0,0 +1,91 @@
package tech.powerjob.server.core.scheduler;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler;
import tech.powerjob.server.core.scheduler.auxiliary.impl.*;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
/**
* @author Echo009
* @since 2022/3/29
*/
public class TimingStrategyServiceTest {
private final TimingStrategyService timingStrategyService;
public TimingStrategyServiceTest() {
List<TimingStrategyHandler> timingStrategyHandlers = new ArrayList<>();
timingStrategyHandlers.add(new CronTimingStrategyHandler());
timingStrategyHandlers.add(new ApiTimingStrategyHandler());
timingStrategyHandlers.add(new FixedDelayTimingStrategyHandler());
timingStrategyHandlers.add(new FixedRateTimingStrategyHandler());
timingStrategyHandlers.add(new WorkflowTimingStrategyHandler());
timingStrategyService = new TimingStrategyService(timingStrategyHandlers);
}
@Test
public void testApiAndWorkflow() {
// api
Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.API, "", null, null));
List<String> triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.API, "", null, null);
Assert.assertEquals(1, triggerTimes.size());
// workflow
Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.WORKFLOW, "", null, null));
triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.WORKFLOW, "", null, null);
Assert.assertEquals(1, triggerTimes.size());
}
@Test
public void testFixedRate() {
// fixed rate
Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "-0", null, null));
Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "FFF", null, null));
Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "300000", null, null));
Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.FIXED_RATE, "10000", null, null));
long timeParam = 1000;
List<String> triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.FIXED_RATE, String.valueOf(timeParam), null, null);
Assert.assertEquals(5, triggerTimes.size());
Long startTime = System.currentTimeMillis() + timeParam;
Long endTime = System.currentTimeMillis() + timeParam * 3;
triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.FIXED_RATE, String.valueOf(timeParam), startTime, endTime);
Assert.assertEquals(3, triggerTimes.size());
}
@Test
public void testFixedDelay() {
// fixed delay
Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "-0", null, null));
Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "FFF", null, null));
Assertions.assertThrows(PowerJobException.class, () -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "300000", null, null));
Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.FIXED_DELAY, "10000", null, null));
List<String> triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.FIXED_DELAY, "1", null, null);
Assert.assertEquals(1, triggerTimes.size());
}
@Test
public void testCron() {
Assertions.assertThrows(IllegalArgumentException.class, () -> timingStrategyService.validate(TimeExpressionType.CRON, "00 00 07 8-14,22-28 * 8", null, null));
Assertions.assertDoesNotThrow(() -> timingStrategyService.validate(TimeExpressionType.CRON, "00 00 07 8-14,22-28 * 2", null, null));
// https://github.com/PowerJob/PowerJob/issues/382
// 支持同时指定 day-of-week day-of-month
// 每隔一周的周一早上 7 点执行一次
LocalDateTime start = LocalDateTime.of(2088, 5, 24, 7, 0, 0);
LocalDateTime end = LocalDateTime.of(2088, 7, 12, 7, 0, 0);
List<String> triggerTimes = timingStrategyService.calculateNextTriggerTimes(TimeExpressionType.CRON, "0 0 7 8-14,22-28 * 2", start.toEpochSecond(ZoneOffset.of("+8")) * 1000, end.toEpochSecond(ZoneOffset.of("+8")) * 1000);
Assert.assertNotNull(triggerTimes);
}
}

View File

@ -1,26 +0,0 @@
package tech.powerjob.server.test;
import tech.powerjob.server.common.utils.CronExpression;
import org.junit.Test;
import java.util.Date;
/**
* CRON 测试
*
* @author tjq
* @since 2020/10/8
*/
public class CronTest {
private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020";
@Test
public void testFixedTimeCron() throws Exception {
CronExpression cronExpression = new CronExpression(FIXED_CRON);
System.out.println(cronExpression.getCronExpression());
System.out.println(cronExpression.getNextValidTimeAfter(new Date()));
}
}

View File

@ -1,11 +1,9 @@
package tech.powerjob.server.test;
import tech.powerjob.server.common.utils.CronExpression;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;
@ -19,14 +17,6 @@ import java.util.stream.Collectors;
*/
public class UtilsTest {
@Test
public void testCronExpression() throws Exception {
String cron = "0 * * * * ? *";
CronExpression cronExpression = new CronExpression(cron);
final Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(new Date());
System.out.println(nextValidTimeAfter);
}
@Test
public void normalTest() {
String s = "000000000111010000001100000000010110100110100000000001000000000000";

View File

@ -270,7 +270,7 @@ public class ProcessorTracker {
// 超时检查如果超时则自动关闭 TaskTracker
long interval = System.currentTimeMillis() - startTime;
// 秒级任务的ProcessorTracker不应该关闭
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
if (!TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
if (interval > instanceInfo.getInstanceTimeoutMS()) {
log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
destroy();
@ -385,7 +385,7 @@ public class ProcessorTracker {
if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
return instanceInfo.getThreadConcurrency();
}
if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
return instanceInfo.getThreadConcurrency();
}
return 2;