feat: support job export #571

This commit is contained in:
tjq 2023-03-04 15:03:22 +08:00
parent 43dfc9a265
commit 5ddaa33f47
7 changed files with 433 additions and 306 deletions

View File

@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.exception.PowerJobException;
@ -73,6 +74,9 @@ public class JsonUtils {
}
public static <T> T parseObjectIgnoreException(String json, Class<T> clz) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
return JSON_MAPPER.readValue(json, clz);
}catch (Exception e) {

View File

@ -1,39 +1,11 @@
package tech.powerjob.server.core.service;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 应用信息服务
* AppInfoService
*
* @author tjq
* @since 2020/6/20
* @since 2023/3/4
*/
@Service
@RequiredArgsConstructor
public class AppInfoService {
private final AppInfoRepository appInfoRepository;
/**
* 验证应用访问权限
* @param appName 应用名称
* @param password 密码
* @return 应用ID
*/
public Long assertApp(String appName, String password) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId();
}
throw new PowerJobException("password error!");
}
public interface AppInfoService {
Long assertApp(String appName, String password);
}

View File

@ -1,298 +1,37 @@
package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.scheduler.TimingStrategyService;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 任务服务
* JobService
*
* @author tjq
* @since 2020/4/15
* @since 2023/3/4
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class JobService {
public interface JobService {
private final InstanceService instanceService;
Long saveJob(SaveJobInfoRequest request);
private final DispatchService dispatchService;
JobInfoDO copyJob(Long jobId);
private final JobInfoRepository jobInfoRepository;
JobInfoDTO fetchJob(Long jobId);
private final InstanceInfoRepository instanceInfoRepository;
List<JobInfoDTO> fetchAllJob(Long appId);
private final TimingStrategyService timingStrategyService;
List<JobInfoDTO> queryJob(PowerQuery powerQuery);
/**
* 保存/修改任务
*
* @param request 任务请求
* @return 创建的任务IDjobId
*/
public Long saveJob(SaveJobInfoRequest request) {
long runJob(Long appId, Long jobId, String instanceParams, Long delay);
request.valid();
JobInfoDO jobInfoDO;
if (request.getId() != null) {
jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
} else {
jobInfoDO = new JobInfoDO();
}
// 值拷贝
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
jobInfoDO.setExecuteType(request.getExecuteType().getV());
jobInfoDO.setProcessorType(request.getProcessorType().getV());
jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV());
// 填充默认值非空保护防止 NPE
fillDefaultValue(jobInfoDO);
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds()));
}
LifeCycle lifecycle = Optional.ofNullable(request.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle));
// 检查定时策略
timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd());
calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date());
}
// 检查告警配置
if (request.getAlarmConfig() != null) {
AlarmConfig config = request.getAlarmConfig();
if (config.getStatisticWindowLen() == null || config.getAlertThreshold() == null || config.getSilenceWindowLen() == null) {
throw new PowerJobException("illegal alarm config!");
}
jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig()));
}
// 日志配置
if (request.getLogConfig() != null) {
jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig()));
}
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
return res.getId();
}
/**
* 复制任务
*
* @param jobId 目标任务ID
* @return 复制后的任务 ID
*/
public JobInfoDO copyJob(Long jobId) {
JobInfoDO origin = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId));
if (origin.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new IllegalStateException("can't copy the job which has been deleted!");
}
JobInfoDO copyJob = new JobInfoDO();
// 值拷贝
BeanUtils.copyProperties(origin, copyJob);
// 填充默认值理论上应该不需要
fillDefaultValue(copyJob);
// 修正创建时间以及更新时间
copyJob.setId(null);
copyJob.setJobName(copyJob.getJobName() + "_COPY");
copyJob.setGmtCreate(new Date());
copyJob.setGmtModified(new Date());
copyJob = jobInfoRepository.saveAndFlush(copyJob);
return copyJob;
}
public JobInfoDTO fetchJob(Long jobId) {
return convert(jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)));
}
public List<JobInfoDTO> fetchAllJob(Long appId) {
return jobInfoRepository.findByAppId(appId).stream().map(JobService::convert).collect(Collectors.toList());
}
public List<JobInfoDTO> queryJob(PowerQuery powerQuery) {
Specification<JobInfoDO> specification = QueryConvertUtils.toSpecification(powerQuery);
return jobInfoRepository.findAll(specification).stream().map(JobService::convert).collect(Collectors.toList());
}
/**
* 手动立即运行某个任务
*
* @param jobId 任务ID
* @param instanceParams 任务实例参数 OpenAPI 存在
* @param delay 延迟时间单位 毫秒
* @return 任务实例ID
*/
@DesignateServer
public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {
delay = delay == null ? 0 : delay;
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty());
} else {
InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty()));
}
log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams);
return instanceInfo.getInstanceId();
}
/**
* 删除某个任务
*
* @param jobId 任务ID
*/
public void deleteJob(Long jobId) {
shutdownOrStopJob(jobId, SwitchableStatus.DELETED);
}
/**
* 禁用某个任务
*/
public void disableJob(Long jobId) {
shutdownOrStopJob(jobId, SwitchableStatus.DISABLE);
}
/**
* 启用某个任务
*
* @param jobId 任务ID
*/
public void enableJob(Long jobId) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
calculateNextTriggerTime(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO);
}
/**
* 停止或删除某个JOB
* 秒级任务还要额外停止正在运行的任务实例
*/
private void shutdownOrStopJob(Long jobId, SwitchableStatus status) {
// 1. 先更新 job_info
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
JobInfoDO jobInfoDO = jobInfoOPT.get();
jobInfoDO.setStatus(status.getV());
jobInfoDO.setGmtModified(new Date());
jobInfoRepository.saveAndFlush(jobInfoDO);
// 2. 关闭秒级任务
if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) {
return;
}
List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS);
if (CollectionUtils.isEmpty(executeLogs)) {
return;
}
if (executeLogs.size() > 1) {
log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
}
executeLogs.forEach(instance -> {
try {
// 重复查询了数据库不过问题不大这个调用量很小
instanceService.stopInstance(instance.getAppId(), instance.getInstanceId());
} catch (Exception ignore) {
// ignore exception
}
});
}
private void calculateNextTriggerTime(JobInfoDO jobInfo) {
// 计算下次调度时间
if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
// 固定频率类型的任务不计算
jobInfo.setNextTriggerTime(null);
} else {
LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(TimeExpressionType.of(jobInfo.getTimeExpressionType()), jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
jobInfo.setNextTriggerTime(nextValidTime);
}
// 重写最后修改时间
jobInfo.setGmtModified(new Date());
}
private void fillDefaultValue(JobInfoDO jobInfoDO) {
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxWorkerCount(0);
}
if (jobInfoDO.getMaxInstanceNum() == null) {
jobInfoDO.setMaxInstanceNum(0);
}
if (jobInfoDO.getConcurrency() == null) {
jobInfoDO.setConcurrency(5);
}
if (jobInfoDO.getInstanceRetryNum() == null) {
jobInfoDO.setInstanceRetryNum(0);
}
if (jobInfoDO.getTaskRetryNum() == null) {
jobInfoDO.setTaskRetryNum(0);
}
if (jobInfoDO.getInstanceTimeLimit() == null) {
jobInfoDO.setInstanceTimeLimit(0L);
}
}
private static JobInfoDTO convert(JobInfoDO jobInfoDO) {
JobInfoDTO jobInfoDTO = new JobInfoDTO();
BeanUtils.copyProperties(jobInfoDO, jobInfoDTO);
if (jobInfoDO.getAlarmConfig() != null) {
jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class));
}
return jobInfoDTO;
}
void deleteJob(Long jobId);
void disableJob(Long jobId);
void enableJob(Long jobId);
SaveJobInfoRequest exportJob(Long jobId);
}

View File

@ -0,0 +1,39 @@
package tech.powerjob.server.core.service.impl;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.service.AppInfoService;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import java.util.Objects;
/**
* AppInfoServiceImpl
*
* @author tjq
* @since 2023/3/4
*/
@Service
@RequiredArgsConstructor
public class AppInfoServiceImpl implements AppInfoService {
private final AppInfoRepository appInfoRepository;
/**
* 验证应用访问权限
* @param appName 应用名称
* @param password 密码
* @return 应用ID
*/
@Override
public Long assertApp(String appName, String password) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId();
}
throw new PowerJobException("password error!");
}
}

View File

@ -0,0 +1,55 @@
package tech.powerjob.server.core.service.impl.job;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.stream.Collectors;
/**
* JobConverter
*
* @author tjq
* @since 2023/3/4
*/
public class JobConverter {
public static SaveJobInfoRequest convertJobInfoDO2SaveJobInfoRequest(JobInfoDO jobInfoDO) {
SaveJobInfoRequest saveJobInfoRequest = new SaveJobInfoRequest();
BeanUtils.copyProperties(jobInfoDO, saveJobInfoRequest);
saveJobInfoRequest.setTimeExpressionType(TimeExpressionType.of(jobInfoDO.getTimeExpressionType()));
saveJobInfoRequest.setExecuteType(ExecuteType.of(jobInfoDO.getExecuteType()));
saveJobInfoRequest.setProcessorType(ProcessorType.of(jobInfoDO.getProcessorType()));
if (StringUtils.isNotEmpty(jobInfoDO.getNotifyUserIds())) {
saveJobInfoRequest.setNotifyUserIds(Lists.newArrayList(SJ.COMMA_SPLITTER.split(jobInfoDO.getNotifyUserIds())).stream().map(Long::valueOf).collect(Collectors.toList()));
}
saveJobInfoRequest.setDispatchStrategy(DispatchStrategy.of(jobInfoDO.getDispatchStrategy()));
saveJobInfoRequest.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle()));
saveJobInfoRequest.setAlarmConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getAlarmConfig(), AlarmConfig.class));
saveJobInfoRequest.setLogConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getLogConfig(), LogConfig.class));
return saveJobInfoRequest;
}
public static JobInfoDTO convertJobInfoDO2JobInfoDTO(JobInfoDO jobInfoDO) {
JobInfoDTO jobInfoDTO = new JobInfoDTO();
BeanUtils.copyProperties(jobInfoDO, jobInfoDTO);
if (jobInfoDO.getAlarmConfig() != null) {
jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class));
}
return jobInfoDTO;
}
}

View File

@ -0,0 +1,314 @@
package tech.powerjob.server.core.service.impl.job;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.scheduler.TimingStrategyService;
import tech.powerjob.server.core.service.JobService;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* JobServiceImpl
*
* @author tjq
* @since 2023/3/4
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class JobServiceImpl implements JobService {
private final InstanceService instanceService;
private final DispatchService dispatchService;
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final TimingStrategyService timingStrategyService;
/**
* 保存/修改任务
*
* @param request 任务请求
* @return 创建的任务IDjobId
*/
@Override
public Long saveJob(SaveJobInfoRequest request) {
request.valid();
JobInfoDO jobInfoDO;
if (request.getId() != null) {
jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
} else {
jobInfoDO = new JobInfoDO();
}
// 值拷贝
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
jobInfoDO.setExecuteType(request.getExecuteType().getV());
jobInfoDO.setProcessorType(request.getProcessorType().getV());
jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV());
// 填充默认值非空保护防止 NPE
fillDefaultValue(jobInfoDO);
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds()));
}
LifeCycle lifecycle = Optional.ofNullable(request.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle));
// 检查定时策略
timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd());
calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date());
}
// 检查告警配置
if (request.getAlarmConfig() != null) {
AlarmConfig config = request.getAlarmConfig();
if (config.getStatisticWindowLen() == null || config.getAlertThreshold() == null || config.getSilenceWindowLen() == null) {
throw new PowerJobException("illegal alarm config!");
}
jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig()));
}
// 日志配置
if (request.getLogConfig() != null) {
jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig()));
}
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
return res.getId();
}
/**
* 复制任务
*
* @param jobId 目标任务ID
* @return 复制后的任务 ID
*/
@Override
public JobInfoDO copyJob(Long jobId) {
JobInfoDO origin = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId));
if (origin.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new IllegalStateException("can't copy the job which has been deleted!");
}
JobInfoDO copyJob = new JobInfoDO();
// 值拷贝
BeanUtils.copyProperties(origin, copyJob);
// 填充默认值理论上应该不需要
fillDefaultValue(copyJob);
// 修正创建时间以及更新时间
copyJob.setId(null);
copyJob.setJobName(copyJob.getJobName() + "_COPY");
copyJob.setGmtCreate(new Date());
copyJob.setGmtModified(new Date());
copyJob = jobInfoRepository.saveAndFlush(copyJob);
return copyJob;
}
@Override
public JobInfoDTO fetchJob(Long jobId) {
return JobConverter.convertJobInfoDO2JobInfoDTO(jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)));
}
@Override
public List<JobInfoDTO> fetchAllJob(Long appId) {
return jobInfoRepository.findByAppId(appId).stream().map(JobConverter::convertJobInfoDO2JobInfoDTO).collect(Collectors.toList());
}
@Override
public List<JobInfoDTO> queryJob(PowerQuery powerQuery) {
Specification<JobInfoDO> specification = QueryConvertUtils.toSpecification(powerQuery);
return jobInfoRepository.findAll(specification).stream().map(JobConverter::convertJobInfoDO2JobInfoDTO).collect(Collectors.toList());
}
/**
* 手动立即运行某个任务
*
* @param jobId 任务ID
* @param instanceParams 任务实例参数 OpenAPI 存在
* @param delay 延迟时间单位 毫秒
* @return 任务实例ID
*/
@Override
@DesignateServer
public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {
delay = delay == null ? 0 : delay;
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty());
} else {
InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty()));
}
log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams);
return instanceInfo.getInstanceId();
}
/**
* 删除某个任务
*
* @param jobId 任务ID
*/
@Override
public void deleteJob(Long jobId) {
shutdownOrStopJob(jobId, SwitchableStatus.DELETED);
}
/**
* 禁用某个任务
*/
@Override
public void disableJob(Long jobId) {
shutdownOrStopJob(jobId, SwitchableStatus.DISABLE);
}
/**
* 导出某个任务为 JSON
* @param jobId jobId
* @return 导出结果
*/
@Override
public SaveJobInfoRequest exportJob(Long jobId) {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
if (!jobInfoOpt.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId: " + jobId);
}
final JobInfoDO jobInfoDO = jobInfoOpt.get();
final SaveJobInfoRequest saveJobInfoRequest = JobConverter.convertJobInfoDO2SaveJobInfoRequest(jobInfoDO);
saveJobInfoRequest.setId(null);
log.info("[Job-{}] [exportJob] jobInfoDO: {}, saveJobInfoRequest: {}", jobId, JsonUtils.toJSONString(jobInfoDO), JsonUtils.toJSONString(saveJobInfoRequest));
return saveJobInfoRequest;
}
/**
* 启用某个任务
*
* @param jobId 任务ID
*/
@Override
public void enableJob(Long jobId) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
calculateNextTriggerTime(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO);
}
/**
* 停止或删除某个JOB
* 秒级任务还要额外停止正在运行的任务实例
*/
private void shutdownOrStopJob(Long jobId, SwitchableStatus status) {
// 1. 先更新 job_info
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
JobInfoDO jobInfoDO = jobInfoOPT.get();
jobInfoDO.setStatus(status.getV());
jobInfoDO.setGmtModified(new Date());
jobInfoRepository.saveAndFlush(jobInfoDO);
// 2. 关闭秒级任务
if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) {
return;
}
List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS);
if (CollectionUtils.isEmpty(executeLogs)) {
return;
}
if (executeLogs.size() > 1) {
log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
}
executeLogs.forEach(instance -> {
try {
// 重复查询了数据库不过问题不大这个调用量很小
instanceService.stopInstance(instance.getAppId(), instance.getInstanceId());
} catch (Exception ignore) {
// ignore exception
}
});
}
private void calculateNextTriggerTime(JobInfoDO jobInfo) {
// 计算下次调度时间
if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
// 固定频率类型的任务不计算
jobInfo.setNextTriggerTime(null);
} else {
LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(TimeExpressionType.of(jobInfo.getTimeExpressionType()), jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
jobInfo.setNextTriggerTime(nextValidTime);
}
// 重写最后修改时间
jobInfo.setGmtModified(new Date());
}
private void fillDefaultValue(JobInfoDO jobInfoDO) {
if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxWorkerCount(0);
}
if (jobInfoDO.getMaxInstanceNum() == null) {
jobInfoDO.setMaxInstanceNum(0);
}
if (jobInfoDO.getConcurrency() == null) {
jobInfoDO.setConcurrency(5);
}
if (jobInfoDO.getInstanceRetryNum() == null) {
jobInfoDO.setInstanceRetryNum(0);
}
if (jobInfoDO.getTaskRetryNum() == null) {
jobInfoDO.setTaskRetryNum(0);
}
if (jobInfoDO.getInstanceTimeLimit() == null) {
jobInfoDO.setInstanceTimeLimit(0L);
}
}
}

View File

@ -49,6 +49,10 @@ public class JobController {
return ResultDTO.success(JobInfoVO.from(jobService.copyJob(Long.valueOf(jobId))));
}
@GetMapping("/export")
public ResultDTO<SaveJobInfoRequest> exportJob(String jobId) {
return ResultDTO.success(jobService.exportJob(Long.valueOf(jobId)));
}
@GetMapping("/disable")
public ResultDTO<Void> disableJob(String jobId) {