feat: support job's lifecycle #208

This commit is contained in:
tjq 2021-04-05 22:43:48 +08:00
parent e094c22952
commit de7f295bb5
4 changed files with 129 additions and 45 deletions

View File

@ -1,13 +1,18 @@
package tech.powerjob.server.common.utils; package tech.powerjob.server.common.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.RemoteConstant;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ntp.NTPUDPClient; import org.apache.commons.net.ntp.NTPUDPClient;
import org.apache.commons.net.ntp.NtpV3Packet; import org.apache.commons.net.ntp.NtpV3Packet;
import org.apache.commons.net.ntp.TimeInfo; import org.apache.commons.net.ntp.TimeInfo;
import tech.powerjob.server.common.SJ;
import java.net.InetAddress; import java.net.InetAddress;
import java.text.ParseException;
import java.util.Date;
import java.util.List; import java.util.List;
/** /**
@ -24,7 +29,44 @@ public class TimeUtils {
// 最大误差 5S // 最大误差 5S
private static final long MAX_OFFSET = 5000; private static final long MAX_OFFSET = 5000;
public static void check() throws TimeCheckException { /**
* 计算 CRON 表达式的下一次执行时间
* @param cron CRON 表达式
* @param startTime 下一次执行时间的起始时间不得早于该时间
* @param sortedLifeCycle 额外的生命周期格式为 ts1-ts2,ts3-ts4下一次执行时间必须符合该区间范围
* @return 执行时间 or NULL
* @throws ParseException CRON 表达式解析异常
*/
public static Date calculateNextCronTime(String cron, long startTime, String sortedLifeCycle) throws ParseException {
CronExpression ce = new CronExpression(cron);
if (StringUtils.isEmpty(sortedLifeCycle)) {
return ce.getNextValidTimeAfter(new Date(startTime));
}
List<Pair<Long, Long>> remainingLifecycle = Lists.newLinkedList();
List<String> lifeCycle = SJ.COMMA_SPLITTER.splitToList(sortedLifeCycle);
// 只保留结束时间在当前时间前的生命周期
lifeCycle.forEach(range -> {
String[] split = range.split("-");
long end = Long.parseLong(split[1]);
if (end < startTime) {
return;
}
remainingLifecycle.add(Pair.of(Long.valueOf(split[0]), end));
});
for (Pair<Long, Long> range : remainingLifecycle) {
long newStartTime = Math.max(range.getLeft(), startTime);
Date nextValidTime = ce.getNextValidTimeAfter(new Date(newStartTime));
if (nextValidTime != null && nextValidTime.getTime() <= range.getRight()) {
return nextValidTime;
}
}
return null;
}
public static void checkServerTime() throws TimeCheckException {
NTPUDPClient timeClient = new NTPUDPClient(); NTPUDPClient timeClient = new NTPUDPClient();
@ -50,7 +92,7 @@ public class TimeUtils {
} }
} }
throw new TimeCheckException("no available ntp server, maybe alibaba, sjtu and apple are both collapse"); throw new TimeCheckException("no available ntp server, maybe alibaba, sjtu and apple are both collapse");
}finally { } finally {
timeClient.close(); timeClient.close();
} }
} }

View File

@ -1,23 +1,5 @@
package tech.powerjob.server.core.scheduler; package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.service.JobService;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -28,6 +10,24 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.common.utils.TimeUtils;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.service.JobService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.ParseException; import java.text.ParseException;
@ -254,7 +254,7 @@ public class PowerScheduleService {
} }
private void refreshJob(JobInfoDO jobInfo) throws ParseException { private void refreshJob(JobInfoDO jobInfo) throws ParseException {
Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression(), jobInfo.getLifecycle());
JobInfoDO updatedJobInfo = new JobInfoDO(); JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfo, updatedJobInfo); BeanUtils.copyProperties(jobInfo, updatedJobInfo);
@ -271,7 +271,7 @@ public class PowerScheduleService {
} }
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException { private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression(), wfInfo.getLifecycle());
WorkflowInfoDO updateEntity = new WorkflowInfoDO(); WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity); BeanUtils.copyProperties(wfInfo, updateEntity);
@ -295,11 +295,10 @@ public class PowerScheduleService {
* @return 下一次调度时间 * @return 下一次调度时间
* @throws ParseException 异常 * @throws ParseException 异常
*/ */
private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws ParseException { private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression, String lifecycle) throws ParseException {
CronExpression ce = new CronExpression(cronExpression);
// 取最大值防止长时间未调度任务被连续调度原来DISABLE的任务突然被打开不取最大值会补上过去所有的调度 // 取最大值防止长时间未调度任务被连续调度原来DISABLE的任务突然被打开不取最大值会补上过去所有的调度
long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime); long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime);
return ce.getNextValidTimeAfter(new Date(benchmarkTime)); return TimeUtils.calculateNextCronTime(cronExpression, benchmarkTime, lifecycle);
} }
} }

View File

@ -1,28 +1,28 @@
package tech.powerjob.server.core.service; package tech.powerjob.server.core.service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.CronExpression;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification; import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.common.utils.TimeUtils;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.ParseException; import java.text.ParseException;
@ -238,8 +238,7 @@ public class JobService {
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON) { if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); Date nextValidTime = TimeUtils.calculateNextCronTime(jobInfoDO.getTimeExpression(), System.currentTimeMillis(), jobInfoDO.getLifecycle());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
if (nextValidTime == null) { if (nextValidTime == null) {
throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression()); throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression());
} }

View File

@ -0,0 +1,44 @@
package tech.powerjob.server.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Date;
@Slf4j
class TimeUtilsTest {
private static final String CRON = "0 0/5 * * * ? *";
@Test
void testEmptyOfLifecycle() throws Exception {
Date date = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), null);
log.info("testEmptyOfLifecycle: {}", date);
Assertions.assertNotNull(date);
}
@Test
void testEndOfLifecycle() throws Exception {
Date date = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), "1217633201188-1417633201188");
log.info("testEndOfLifecycle: {}", date);
Assertions.assertNull(date);
}
@Test
void testMultiLifecycle() throws Exception {
String lifecycle = "1217633201188-1417633201188,1417633201188-1717633201188";
Date date1 = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), lifecycle);
Date date2 = TimeUtils.calculateNextCronTime(CRON, System.currentTimeMillis(), null);
log.info("testMultiLifecycle date1: {}", date1);
log.info("testMultiLifecycle date2: {}", date2);
Assertions.assertEquals(date1, date2);
}
@Test
void check() {
TimeUtils.checkServerTime();
}
}