diff --git a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java index 77ca831c..985b8df9 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.json.JsonMapper; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import tech.powerjob.common.exception.PowerJobException; @@ -73,6 +74,9 @@ public class JsonUtils { } public static T parseObjectIgnoreException(String json, Class clz) { + if (StringUtils.isEmpty(json)) { + return null; + } try { return JSON_MAPPER.readValue(json, clz); }catch (Exception e) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java index e768bd77..4e81b048 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java @@ -1,39 +1,11 @@ package tech.powerjob.server.core.service; -import lombok.RequiredArgsConstructor; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.server.persistence.remote.model.AppInfoDO; -import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.Objects; - /** - * 应用信息服务 + * AppInfoService * * @author tjq - * @since 2020/6/20 + * @since 2023/3/4 */ -@Service -@RequiredArgsConstructor -public class AppInfoService { - - private final AppInfoRepository appInfoRepository; - - /** - * 验证应用访问权限 - * @param appName 应用名称 - * @param password 密码 - * @return 应用ID - */ - public Long assertApp(String appName, String password) { - - AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName)); - if (Objects.equals(appInfo.getPassword(), password)) { - return appInfo.getId(); - } - throw new PowerJobException("password error!"); - } - +public interface AppInfoService { + Long assertApp(String appName, String password); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index dc58de93..1ad88b8d 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -1,298 +1,37 @@ package tech.powerjob.server.core.service; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.data.jpa.domain.Specification; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; import tech.powerjob.common.PowerQuery; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.response.JobInfoDTO; -import tech.powerjob.server.common.SJ; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; -import tech.powerjob.server.core.DispatchService; -import tech.powerjob.server.core.instance.InstanceService; -import tech.powerjob.server.core.scheduler.TimingStrategyService; -import tech.powerjob.server.persistence.QueryConvertUtils; -import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.remote.server.redirector.DesignateServer; -import javax.annotation.Resource; -import java.text.ParseException; -import java.util.Date; import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; /** - * 任务服务 + * JobService * * @author tjq - * @since 2020/4/15 + * @since 2023/3/4 */ -@Slf4j -@Service -@RequiredArgsConstructor -public class JobService { +public interface JobService { - private final InstanceService instanceService; + Long saveJob(SaveJobInfoRequest request); - private final DispatchService dispatchService; + JobInfoDO copyJob(Long jobId); - private final JobInfoRepository jobInfoRepository; + JobInfoDTO fetchJob(Long jobId); - private final InstanceInfoRepository instanceInfoRepository; + List fetchAllJob(Long appId); - private final TimingStrategyService timingStrategyService; + List queryJob(PowerQuery powerQuery); - /** - * 保存/修改任务 - * - * @param request 任务请求 - * @return 创建的任务ID(jobId) - */ - public Long saveJob(SaveJobInfoRequest request) { + long runJob(Long appId, Long jobId, String instanceParams, Long delay); - request.valid(); + void deleteJob(Long jobId); - JobInfoDO jobInfoDO; - if (request.getId() != null) { - jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId())); - } else { - jobInfoDO = new JobInfoDO(); - } + void disableJob(Long jobId); - // 值拷贝 - BeanUtils.copyProperties(request, jobInfoDO); - - // 拷贝枚举值 - jobInfoDO.setExecuteType(request.getExecuteType().getV()); - jobInfoDO.setProcessorType(request.getProcessorType().getV()); - jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); - jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); - jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV()); - - // 填充默认值,非空保护防止 NPE - fillDefaultValue(jobInfoDO); - - // 转化报警用户列表 - if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { - jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds())); - } - LifeCycle lifecycle = Optional.ofNullable(request.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE); - jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle)); - // 检查定时策略 - timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd()); - calculateNextTriggerTime(jobInfoDO); - if (request.getId() == null) { - jobInfoDO.setGmtCreate(new Date()); - } - // 检查告警配置 - if (request.getAlarmConfig() != null) { - AlarmConfig config = request.getAlarmConfig(); - if (config.getStatisticWindowLen() == null || config.getAlertThreshold() == null || config.getSilenceWindowLen() == null) { - throw new PowerJobException("illegal alarm config!"); - } - jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig())); - } - // 日志配置 - if (request.getLogConfig() != null) { - jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig())); - } - JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO); - return res.getId(); - } - - /** - * 复制任务 - * - * @param jobId 目标任务ID - * @return 复制后的任务 ID - */ - public JobInfoDO copyJob(Long jobId) { - - JobInfoDO origin = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)); - if (origin.getStatus() == SwitchableStatus.DELETED.getV()) { - throw new IllegalStateException("can't copy the job which has been deleted!"); - } - JobInfoDO copyJob = new JobInfoDO(); - // 值拷贝 - BeanUtils.copyProperties(origin, copyJob); - // 填充默认值,理论上应该不需要 - fillDefaultValue(copyJob); - // 修正创建时间以及更新时间 - copyJob.setId(null); - copyJob.setJobName(copyJob.getJobName() + "_COPY"); - copyJob.setGmtCreate(new Date()); - copyJob.setGmtModified(new Date()); - - copyJob = jobInfoRepository.saveAndFlush(copyJob); - return copyJob; - - } - - - public JobInfoDTO fetchJob(Long jobId) { - return convert(jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId))); - } - - public List fetchAllJob(Long appId) { - return jobInfoRepository.findByAppId(appId).stream().map(JobService::convert).collect(Collectors.toList()); - } - - public List queryJob(PowerQuery powerQuery) { - Specification specification = QueryConvertUtils.toSpecification(powerQuery); - return jobInfoRepository.findAll(specification).stream().map(JobService::convert).collect(Collectors.toList()); - } - - /** - * 手动立即运行某个任务 - * - * @param jobId 任务ID - * @param instanceParams 任务实例参数(仅 OpenAPI 存在) - * @param delay 延迟时间,单位 毫秒 - * @return 任务实例ID - */ - @DesignateServer - public long runJob(Long appId, Long jobId, String instanceParams, Long delay) { - - delay = delay == null ? 0 : delay; - JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); - - log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay); - final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); - instanceInfoRepository.flush(); - if (delay <= 0) { - dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty()); - } else { - InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty())); - } - log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams); - return instanceInfo.getInstanceId(); - } - - - /** - * 删除某个任务 - * - * @param jobId 任务ID - */ - public void deleteJob(Long jobId) { - shutdownOrStopJob(jobId, SwitchableStatus.DELETED); - } - - /** - * 禁用某个任务 - */ - public void disableJob(Long jobId) { - shutdownOrStopJob(jobId, SwitchableStatus.DISABLE); - } - - /** - * 启用某个任务 - * - * @param jobId 任务ID - */ - public void enableJob(Long jobId) { - JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); - - jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); - calculateNextTriggerTime(jobInfoDO); - - jobInfoRepository.saveAndFlush(jobInfoDO); - } - - /** - * 停止或删除某个JOB - * 秒级任务还要额外停止正在运行的任务实例 - */ - private void shutdownOrStopJob(Long jobId, SwitchableStatus status) { - - // 1. 先更新 job_info 表 - Optional jobInfoOPT = jobInfoRepository.findById(jobId); - if (!jobInfoOPT.isPresent()) { - throw new IllegalArgumentException("can't find job by jobId:" + jobId); - } - JobInfoDO jobInfoDO = jobInfoOPT.get(); - jobInfoDO.setStatus(status.getV()); - jobInfoDO.setGmtModified(new Date()); - jobInfoRepository.saveAndFlush(jobInfoDO); - - // 2. 关闭秒级任务 - if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) { - return; - } - List executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS); - if (CollectionUtils.isEmpty(executeLogs)) { - return; - } - if (executeLogs.size() > 1) { - log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId); - } - executeLogs.forEach(instance -> { - try { - // 重复查询了数据库,不过问题不大,这个调用量很小 - instanceService.stopInstance(instance.getAppId(), instance.getInstanceId()); - } catch (Exception ignore) { - // ignore exception - } - }); - } - - private void calculateNextTriggerTime(JobInfoDO jobInfo) { - // 计算下次调度时间 - if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { - // 固定频率类型的任务不计算 - jobInfo.setNextTriggerTime(null); - } else { - LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); - Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(TimeExpressionType.of(jobInfo.getTimeExpressionType()), jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); - jobInfo.setNextTriggerTime(nextValidTime); - } - // 重写最后修改时间 - jobInfo.setGmtModified(new Date()); - } - - private void fillDefaultValue(JobInfoDO jobInfoDO) { - if (jobInfoDO.getMaxWorkerCount() == null) { - jobInfoDO.setMaxWorkerCount(0); - } - if (jobInfoDO.getMaxInstanceNum() == null) { - jobInfoDO.setMaxInstanceNum(0); - } - if (jobInfoDO.getConcurrency() == null) { - jobInfoDO.setConcurrency(5); - } - if (jobInfoDO.getInstanceRetryNum() == null) { - jobInfoDO.setInstanceRetryNum(0); - } - if (jobInfoDO.getTaskRetryNum() == null) { - jobInfoDO.setTaskRetryNum(0); - } - if (jobInfoDO.getInstanceTimeLimit() == null) { - jobInfoDO.setInstanceTimeLimit(0L); - } - } - - private static JobInfoDTO convert(JobInfoDO jobInfoDO) { - JobInfoDTO jobInfoDTO = new JobInfoDTO(); - BeanUtils.copyProperties(jobInfoDO, jobInfoDTO); - if (jobInfoDO.getAlarmConfig() != null) { - jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class)); - } - return jobInfoDTO; - } + void enableJob(Long jobId); + SaveJobInfoRequest exportJob(Long jobId); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/AppInfoServiceImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/AppInfoServiceImpl.java new file mode 100644 index 00000000..b5e96ed0 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/AppInfoServiceImpl.java @@ -0,0 +1,39 @@ +package tech.powerjob.server.core.service.impl; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.server.core.service.AppInfoService; +import tech.powerjob.server.persistence.remote.model.AppInfoDO; +import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; + +import java.util.Objects; + +/** + * AppInfoServiceImpl + * + * @author tjq + * @since 2023/3/4 + */ +@Service +@RequiredArgsConstructor +public class AppInfoServiceImpl implements AppInfoService { + + private final AppInfoRepository appInfoRepository; + + /** + * 验证应用访问权限 + * @param appName 应用名称 + * @param password 密码 + * @return 应用ID + */ + @Override + public Long assertApp(String appName, String password) { + + AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName)); + if (Objects.equals(appInfo.getPassword(), password)) { + return appInfo.getId(); + } + throw new PowerJobException("password error!"); + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java new file mode 100644 index 00000000..7c45f4f2 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java @@ -0,0 +1,55 @@ +package tech.powerjob.server.core.service.impl.job; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.BeanUtils; +import tech.powerjob.common.enums.DispatchStrategy; +import tech.powerjob.common.enums.ExecuteType; +import tech.powerjob.common.enums.ProcessorType; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.LifeCycle; +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.common.request.http.SaveJobInfoRequest; +import tech.powerjob.common.response.JobInfoDTO; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; + +import java.util.stream.Collectors; + +/** + * JobConverter + * + * @author tjq + * @since 2023/3/4 + */ +public class JobConverter { + + public static SaveJobInfoRequest convertJobInfoDO2SaveJobInfoRequest(JobInfoDO jobInfoDO) { + SaveJobInfoRequest saveJobInfoRequest = new SaveJobInfoRequest(); + BeanUtils.copyProperties(jobInfoDO, saveJobInfoRequest); + saveJobInfoRequest.setTimeExpressionType(TimeExpressionType.of(jobInfoDO.getTimeExpressionType())); + saveJobInfoRequest.setExecuteType(ExecuteType.of(jobInfoDO.getExecuteType())); + saveJobInfoRequest.setProcessorType(ProcessorType.of(jobInfoDO.getProcessorType())); + if (StringUtils.isNotEmpty(jobInfoDO.getNotifyUserIds())) { + saveJobInfoRequest.setNotifyUserIds(Lists.newArrayList(SJ.COMMA_SPLITTER.split(jobInfoDO.getNotifyUserIds())).stream().map(Long::valueOf).collect(Collectors.toList())); + } + saveJobInfoRequest.setDispatchStrategy(DispatchStrategy.of(jobInfoDO.getDispatchStrategy())); + saveJobInfoRequest.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle())); + saveJobInfoRequest.setAlarmConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getAlarmConfig(), AlarmConfig.class)); + saveJobInfoRequest.setLogConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getLogConfig(), LogConfig.class)); + return saveJobInfoRequest; + } + + public static JobInfoDTO convertJobInfoDO2JobInfoDTO(JobInfoDO jobInfoDO) { + JobInfoDTO jobInfoDTO = new JobInfoDTO(); + BeanUtils.copyProperties(jobInfoDO, jobInfoDTO); + if (jobInfoDO.getAlarmConfig() != null) { + jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class)); + } + return jobInfoDTO; + } + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java new file mode 100644 index 00000000..9fd0536d --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java @@ -0,0 +1,314 @@ +package tech.powerjob.server.core.service.impl.job; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.LifeCycle; +import tech.powerjob.common.request.http.SaveJobInfoRequest; +import tech.powerjob.common.response.JobInfoDTO; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; +import tech.powerjob.server.core.DispatchService; +import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.core.scheduler.TimingStrategyService; +import tech.powerjob.server.core.service.JobService; +import tech.powerjob.server.persistence.QueryConvertUtils; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.server.redirector.DesignateServer; + +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * JobServiceImpl + * + * @author tjq + * @since 2023/3/4 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class JobServiceImpl implements JobService { + + private final InstanceService instanceService; + + private final DispatchService dispatchService; + + private final JobInfoRepository jobInfoRepository; + + private final InstanceInfoRepository instanceInfoRepository; + + private final TimingStrategyService timingStrategyService; + + /** + * 保存/修改任务 + * + * @param request 任务请求 + * @return 创建的任务ID(jobId) + */ + @Override + public Long saveJob(SaveJobInfoRequest request) { + + request.valid(); + + JobInfoDO jobInfoDO; + if (request.getId() != null) { + jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId())); + } else { + jobInfoDO = new JobInfoDO(); + } + + // 值拷贝 + BeanUtils.copyProperties(request, jobInfoDO); + + // 拷贝枚举值 + jobInfoDO.setExecuteType(request.getExecuteType().getV()); + jobInfoDO.setProcessorType(request.getProcessorType().getV()); + jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); + jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); + jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV()); + + // 填充默认值,非空保护防止 NPE + fillDefaultValue(jobInfoDO); + + // 转化报警用户列表 + if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { + jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds())); + } + LifeCycle lifecycle = Optional.ofNullable(request.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE); + jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle)); + // 检查定时策略 + timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd()); + calculateNextTriggerTime(jobInfoDO); + if (request.getId() == null) { + jobInfoDO.setGmtCreate(new Date()); + } + // 检查告警配置 + if (request.getAlarmConfig() != null) { + AlarmConfig config = request.getAlarmConfig(); + if (config.getStatisticWindowLen() == null || config.getAlertThreshold() == null || config.getSilenceWindowLen() == null) { + throw new PowerJobException("illegal alarm config!"); + } + jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig())); + } + // 日志配置 + if (request.getLogConfig() != null) { + jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig())); + } + JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO); + return res.getId(); + } + + /** + * 复制任务 + * + * @param jobId 目标任务ID + * @return 复制后的任务 ID + */ + @Override + public JobInfoDO copyJob(Long jobId) { + + JobInfoDO origin = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)); + if (origin.getStatus() == SwitchableStatus.DELETED.getV()) { + throw new IllegalStateException("can't copy the job which has been deleted!"); + } + JobInfoDO copyJob = new JobInfoDO(); + // 值拷贝 + BeanUtils.copyProperties(origin, copyJob); + // 填充默认值,理论上应该不需要 + fillDefaultValue(copyJob); + // 修正创建时间以及更新时间 + copyJob.setId(null); + copyJob.setJobName(copyJob.getJobName() + "_COPY"); + copyJob.setGmtCreate(new Date()); + copyJob.setGmtModified(new Date()); + + copyJob = jobInfoRepository.saveAndFlush(copyJob); + return copyJob; + + } + + @Override + public JobInfoDTO fetchJob(Long jobId) { + return JobConverter.convertJobInfoDO2JobInfoDTO(jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId))); + } + + @Override + public List fetchAllJob(Long appId) { + return jobInfoRepository.findByAppId(appId).stream().map(JobConverter::convertJobInfoDO2JobInfoDTO).collect(Collectors.toList()); + } + + @Override + public List queryJob(PowerQuery powerQuery) { + Specification specification = QueryConvertUtils.toSpecification(powerQuery); + return jobInfoRepository.findAll(specification).stream().map(JobConverter::convertJobInfoDO2JobInfoDTO).collect(Collectors.toList()); + } + + /** + * 手动立即运行某个任务 + * + * @param jobId 任务ID + * @param instanceParams 任务实例参数(仅 OpenAPI 存在) + * @param delay 延迟时间,单位 毫秒 + * @return 任务实例ID + */ + @Override + @DesignateServer + public long runJob(Long appId, Long jobId, String instanceParams, Long delay) { + + delay = delay == null ? 0 : delay; + JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); + + log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay); + final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); + instanceInfoRepository.flush(); + if (delay <= 0) { + dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty()); + } else { + InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty())); + } + log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams); + return instanceInfo.getInstanceId(); + } + + + /** + * 删除某个任务 + * + * @param jobId 任务ID + */ + @Override + public void deleteJob(Long jobId) { + shutdownOrStopJob(jobId, SwitchableStatus.DELETED); + } + + /** + * 禁用某个任务 + */ + @Override + public void disableJob(Long jobId) { + shutdownOrStopJob(jobId, SwitchableStatus.DISABLE); + } + + /** + * 导出某个任务为 JSON + * @param jobId jobId + * @return 导出结果 + */ + @Override + public SaveJobInfoRequest exportJob(Long jobId) { + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + if (!jobInfoOpt.isPresent()) { + throw new IllegalArgumentException("can't find job by jobId: " + jobId); + } + final JobInfoDO jobInfoDO = jobInfoOpt.get(); + final SaveJobInfoRequest saveJobInfoRequest = JobConverter.convertJobInfoDO2SaveJobInfoRequest(jobInfoDO); + saveJobInfoRequest.setId(null); + log.info("[Job-{}] [exportJob] jobInfoDO: {}, saveJobInfoRequest: {}", jobId, JsonUtils.toJSONString(jobInfoDO), JsonUtils.toJSONString(saveJobInfoRequest)); + return saveJobInfoRequest; + } + + /** + * 启用某个任务 + * + * @param jobId 任务ID + */ + @Override + public void enableJob(Long jobId) { + JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); + + jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); + calculateNextTriggerTime(jobInfoDO); + + jobInfoRepository.saveAndFlush(jobInfoDO); + } + + /** + * 停止或删除某个JOB + * 秒级任务还要额外停止正在运行的任务实例 + */ + private void shutdownOrStopJob(Long jobId, SwitchableStatus status) { + + // 1. 先更新 job_info 表 + Optional jobInfoOPT = jobInfoRepository.findById(jobId); + if (!jobInfoOPT.isPresent()) { + throw new IllegalArgumentException("can't find job by jobId:" + jobId); + } + JobInfoDO jobInfoDO = jobInfoOPT.get(); + jobInfoDO.setStatus(status.getV()); + jobInfoDO.setGmtModified(new Date()); + jobInfoRepository.saveAndFlush(jobInfoDO); + + // 2. 关闭秒级任务 + if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) { + return; + } + List executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS); + if (CollectionUtils.isEmpty(executeLogs)) { + return; + } + if (executeLogs.size() > 1) { + log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId); + } + executeLogs.forEach(instance -> { + try { + // 重复查询了数据库,不过问题不大,这个调用量很小 + instanceService.stopInstance(instance.getAppId(), instance.getInstanceId()); + } catch (Exception ignore) { + // ignore exception + } + }); + } + + private void calculateNextTriggerTime(JobInfoDO jobInfo) { + // 计算下次调度时间 + if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { + // 固定频率类型的任务不计算 + jobInfo.setNextTriggerTime(null); + } else { + LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle()); + Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(TimeExpressionType.of(jobInfo.getTimeExpressionType()), jobInfo.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); + jobInfo.setNextTriggerTime(nextValidTime); + } + // 重写最后修改时间 + jobInfo.setGmtModified(new Date()); + } + + private void fillDefaultValue(JobInfoDO jobInfoDO) { + if (jobInfoDO.getMaxWorkerCount() == null) { + jobInfoDO.setMaxWorkerCount(0); + } + if (jobInfoDO.getMaxInstanceNum() == null) { + jobInfoDO.setMaxInstanceNum(0); + } + if (jobInfoDO.getConcurrency() == null) { + jobInfoDO.setConcurrency(5); + } + if (jobInfoDO.getInstanceRetryNum() == null) { + jobInfoDO.setInstanceRetryNum(0); + } + if (jobInfoDO.getTaskRetryNum() == null) { + jobInfoDO.setTaskRetryNum(0); + } + if (jobInfoDO.getInstanceTimeLimit() == null) { + jobInfoDO.setInstanceTimeLimit(0L); + } + } +} diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java index ab71491c..ba7a2118 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java @@ -49,6 +49,10 @@ public class JobController { return ResultDTO.success(JobInfoVO.from(jobService.copyJob(Long.valueOf(jobId)))); } + @GetMapping("/export") + public ResultDTO exportJob(String jobId) { + return ResultDTO.success(jobService.exportJob(Long.valueOf(jobId))); + } @GetMapping("/disable") public ResultDTO disableJob(String jobId) {