diff --git a/others/script/jenkins_auto_build.sh b/others/script/jenkins_auto_build.sh index 06a7b444..44d2a44a 100644 --- a/others/script/jenkins_auto_build.sh +++ b/others/script/jenkins_auto_build.sh @@ -25,7 +25,7 @@ docker run -d \ --restart=always \ --name powerjob-server \ -p 7700:7700 -p 10086:10086 \ - -e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://127.0.0.1:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://127.0.0.1:27017/powerjob-product" \ + -e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://124.70.67.79:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://124.70.67.79:27017/powerjob-product" \ -v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \ tjqq/powerjob-server:latest echo "================== powerjob-client 启动完成 ==================" diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 1f94bd77..5e57ad56 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; +import com.github.kfcfans.powerjob.server.service.instance.InstanceMetaInfoService; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -39,6 +40,8 @@ public class DispatchService { @Resource private InstanceManager instanceManager; @Resource + private InstanceMetaInfoService instanceMetaInfoService; + @Resource private InstanceInfoRepository instanceInfoRepository; private static final Splitter commaSplitter = Splitter.on(","); @@ -142,5 +145,8 @@ public class DispatchService { // 修改状态 instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams, now); + + // 装载缓存 + instanceMetaInfoService.loadJobInfo(instanceId, jobInfo); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index da3c03d5..bb3917b5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogDO; import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; -import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; +import com.github.kfcfans.powerjob.server.service.instance.InstanceMetaInfoService; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -46,7 +46,7 @@ import java.util.stream.Stream; public class InstanceLogService { @Resource - private InstanceManager instanceManager; + private InstanceMetaInfoService instanceMetaInfoService; @Resource private GridFsManager gridFsManager; // 本地数据库操作bean @@ -317,16 +317,17 @@ public class InstanceLogService { @Scheduled(fixedDelay = 60000) public void timingCheck() { + // TODO: 检查 lastReportTime,过期 instance 调用 sync 同步并删除 + // 定时删除秒级任务的日志 List frequentInstanceIds = Lists.newLinkedList(); instanceId2LastReportTime.keySet().forEach(instanceId -> { - JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId); - if (jobInfo == null) { - return; - } - - if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { - frequentInstanceIds.add(instanceId); + try { + JobInfoDO jobInfo = instanceMetaInfoService.fetchJobInfoByInstanceId(instanceId); + if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { + frequentInstanceIds.add(instanceId); + } + }catch (Exception ignore) { } }); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index ee85b297..cc2452a9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -8,7 +8,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; -import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.UserService; @@ -16,8 +15,6 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -25,7 +22,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -38,10 +34,6 @@ import java.util.concurrent.TimeUnit; @Service public class InstanceManager { - // 存储 instanceId 对应的 Job 信息,便于重试 - private static Cache instanceId2JobInfo; - - // Spring Bean @Resource private DispatchService dispatchService; @Resource @@ -49,21 +41,12 @@ public class InstanceManager { @Resource(name = "omsCenterAlarmService") private Alarmable omsCenterAlarmService; @Resource - private InstanceInfoRepository instanceInfoRepository; + private InstanceMetaInfoService instanceMetaInfoService; @Resource - private JobInfoRepository jobInfoRepository; + private InstanceInfoRepository instanceInfoRepository; @Resource private WorkflowInstanceManager workflowInstanceManager; - private static final int CACHE_CONCURRENCY_LEVEL = 8; - private static final int CACHE_MAX_SIZE = 4096; - - static { - instanceId2JobInfo = CacheBuilder.newBuilder() - .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) - .maximumSize(CACHE_MAX_SIZE) - .build(); - } /** * 更新任务状态 @@ -71,14 +54,10 @@ public class InstanceManager { */ public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception { - Long jobId = req.getJobId(); Long instanceId = req.getInstanceId(); // 获取相关数据 - JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> { - Optional jobInfoOpt = jobInfoRepository.findById(jobId); - return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId)); - }); + JobInfoDO jobInfo = instanceMetaInfoService.fetchJobInfoByInstanceId(req.getInstanceId()); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); if (instanceInfo == null) { log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId); @@ -173,9 +152,11 @@ public class InstanceManager { // 告警 if (status == InstanceStatus.FAILED) { - JobInfoDO jobInfo = fetchJobInfo(instanceId); - if (jobInfo == null) { - log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId); + JobInfoDO jobInfo; + try { + jobInfo = instanceMetaInfoService.fetchJobInfoByInstanceId(instanceId); + }catch (Exception e) { + log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId); return; } @@ -187,32 +168,6 @@ public class InstanceManager { List userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); omsCenterAlarmService.onJobInstanceFailed(content, userList); } - - // 过期缓存 - instanceId2JobInfo.invalidate(instanceId); } - /** - * 根据任务实例ID查询任务相关信息 - * @param instanceId 任务实例ID - * @return 任务元数据 - */ - public JobInfoDO fetchJobInfo(Long instanceId) { - JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId); - if (jobInfo != null) { - return jobInfo; - } - InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - if (instanceInfo != null) { - return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null); - } - return null; - } - - /** - * 释放本地缓存,防止内存泄漏 - */ - public static void releaseCache() { - instanceId2JobInfo.cleanUp(); - } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetaInfoService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetaInfoService.java new file mode 100644 index 00000000..2d81d65a --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetaInfoService.java @@ -0,0 +1,67 @@ +package com.github.kfcfans.powerjob.server.service.instance; + +import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +/** + * 存储 instance 对应的 JobInfo 信息 + * + * @author tjq + * @since 2020/6/23 + */ +@Service +public class InstanceMetaInfoService { + + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private InstanceInfoRepository instanceInfoRepository; + + // 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变) + private Cache instanceId2JobInfoCache; + + private static final int CACHE_CONCURRENCY_LEVEL = 8; + private static final int CACHE_MAX_SIZE = 4096; + + public InstanceMetaInfoService() { + instanceId2JobInfoCache = CacheBuilder.newBuilder() + .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) + .maximumSize(CACHE_MAX_SIZE) + .build(); + } + + /** + * 根据 instanceId 获取 JobInfo + * @param instanceId instanceId + * @return JobInfoDO + * @throws ExecutionException 异常 + */ + public JobInfoDO fetchJobInfoByInstanceId(Long instanceId) throws ExecutionException { + return instanceId2JobInfoCache.get(instanceId, () -> { + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); + if (instanceInfo != null) { + Optional jobInfoOpt = jobInfoRepository.findById(instanceInfo.getJobId()); + return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobInfo by jobId: " + instanceInfo.getJobId())); + } + throw new IllegalArgumentException("can't find Instance by instanceId: " + instanceId); + }); + } + + /** + * 装载缓存 + * @param instanceId instanceId + * @param jobInfoDO 原始的任务数据 + */ + public void loadJobInfo(Long instanceId, JobInfoDO jobInfoDO) { + instanceId2JobInfoCache.put(instanceId, jobInfoDO); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index e3b9fd88..a74ca467 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -60,7 +60,6 @@ public class CleanService { // 释放本地缓存 WorkerManagerService.releaseContainerInfos(); - InstanceManager.releaseCache(); // 删除数据库运行记录 cleanInstanceLog();