From f951c9f31c02f01bb90ce521fa70bd1f81d7c1e5 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 30 Apr 2020 14:12:35 +0800 Subject: [PATCH] fix the bug of instance would lost it's instancePrams when hava a retry --- .../core/model/InstanceInfoDO.java | 2 ++ .../repository/InstanceInfoRepository.java | 8 +++---- .../oms/server/service/DispatchService.java | 23 +++++++++++++------ .../server/service/alarm/AlarmContent.java | 21 +++++++++++++++++ .../service/instance/InstanceManager.java | 20 +++++++++++----- .../service/instance/InstanceService.java | 2 +- .../timing/InstanceStatusCheckService.java | 8 +++---- .../timing/schedule/JobScheduleService.java | 2 +- .../worker/core/tracker/task/TaskTracker.java | 2 +- 9 files changed, 64 insertions(+), 24 deletions(-) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmContent.java diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java index b7414485..42dd783c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java @@ -30,6 +30,8 @@ public class InstanceInfoDO { private Long appId; // 任务实例ID private Long instanceId; + // 任务实例参数 + private String instanceParams; /** * 任务状态 {@link com.github.kfcfans.common.InstanceStatus} */ diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java index 630f6cfc..383ce61d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java @@ -39,14 +39,14 @@ public interface InstanceInfoRepository extends JpaRepository jobInfo.getMaxInstanceNum()) { String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams); return; } @@ -84,7 +88,7 @@ public class DispatchService { if (CollectionUtils.isEmpty(finalWorkers)) { String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams); return; } @@ -103,7 +107,12 @@ public class DispatchService { BeanUtils.copyProperties(jobInfo, req); // 传入 JobId req.setJobId(jobInfo.getId()); - req.setInstanceParams(instanceParams); + // 传入 InstanceParams + if (StringUtils.isEmpty(instanceParams)) { + req.setInstanceParams(null); + }else { + req.setInstanceParams(instanceParams); + } req.setInstanceId(instanceId); req.setAllWorkerAddress(finalWorkers); @@ -122,6 +131,6 @@ public class DispatchService { log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); // 修改状态 - instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress); + instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmContent.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmContent.java new file mode 100644 index 00000000..198f7e27 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/alarm/AlarmContent.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.oms.server.service.alarm; + +import lombok.Data; + +/** + * 告警对象 + * + * @author tjq + * @since 2020/4/30 + */ +@Data +public class AlarmContent { + // 应用ID + private long appId; + // 任务ID + private long jobId; + // 任务实例ID + private long instanceId; + // 任务名称 + private String jobName; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index eb40fb09..4292e41c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -108,7 +108,6 @@ public class InstanceManager { updateEntity.setFinishedTime(System.currentTimeMillis()); finished = true; - log.info("[InstanceManager] instance(instanceId={}) execute succeed.", instanceId); }else if (newStatus == InstanceStatus.FAILED) { // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =) @@ -118,7 +117,7 @@ public class InstanceManager { // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败) HashedWheelTimerHolder.TIMER.schedule(() -> { - getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); + getDispatchService().redispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); }, 10, TimeUnit.SECONDS); // 修改状态为 等待派发,正式开始重试 @@ -136,25 +135,34 @@ public class InstanceManager { getInstanceInfoRepository().saveAndFlush(updateEntity); if (finished) { - processFinishedInstance(instanceId); + processFinishedInstance(instanceId, updateEntity.getStatus()); } } /** * 收尾完成的任务实例 * @param instanceId 任务实例ID + * @param status 任务实例状态:成功/失败/手动停止 */ - public static void processFinishedInstance(Long instanceId) { + public static void processFinishedInstance(Long instanceId, int status) { - log.info("[InstanceManager] instance(id={}) process finished.", instanceId); + InstanceStatus instanceStatus = InstanceStatus.of(status); + log.info("[InstanceManager] instance(id={}) process finished, current status is {}.", instanceId, instanceStatus); // 清除已完成的实例信息 instanceId2StatusHolder.remove(instanceId); // 这一步也可能导致后面取不到 JobInfoDO - instanceId2JobInfo.remove(instanceId); + JobInfoDO jobInfo = instanceId2JobInfo.remove(instanceId); // 上报日志数据 getInstanceLogService().sync(instanceId); + + // 告警 + if (instanceStatus == InstanceStatus.FAILED) { + + InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); + + } } public static JobInfoDO fetchJobInfo(Long instanceId) { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index f2e0471b..a380d099 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -64,7 +64,7 @@ public class InstanceService { instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfoRepository.saveAndFlush(instanceInfoDO); - InstanceManager.processFinishedInstance(instanceId); + InstanceManager.processFinishedInstance(instanceId, STOPPED.getV()); /* 不可靠通知停止 TaskTracker diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index cb933714..2d669e12 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -89,7 +89,7 @@ public class InstanceStatusCheckService { // 重新派发(orElseGet用于消除编译器警告...) JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); - dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); + dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0); }); } @@ -101,7 +101,7 @@ public class InstanceStatusCheckService { waitingWorkerReceiveInstances.forEach(instance -> { // 重新派发 JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); - dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); + dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0); }); } @@ -124,7 +124,7 @@ public class InstanceStatusCheckService { // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) { - dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); + dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); }else { updateFailedInstance(instance); } @@ -147,6 +147,6 @@ public class InstanceStatusCheckService { instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instanceInfoRepository.saveAndFlush(instance); - InstanceManager.processFinishedInstance(instance.getInstanceId()); + InstanceManager.processFinishedInstance(instance.getInstanceId(), InstanceStatus.FAILED.getV()); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 46ed0e02..a1535a2d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -153,7 +153,7 @@ public class JobScheduleService { } HashedWheelTimerHolder.TIMER.schedule(() -> { - dispatchService.dispatch(jobInfoDO, instanceId, 0); + dispatchService.dispatch(jobInfoDO, instanceId, 0, null); }, delay, TimeUnit.MILLISECONDS); }); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index d1ca0d9f..efc7907a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -118,7 +118,7 @@ public abstract class TaskTracker { /* *************************** 对外方法区 *************************** */ /** * 更新Task状态 - * V1.0.0 -> V1.0.1 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 + * V1.0.0 -> V1.0.1(e405e283ad7f97b0b4e5d369c7de884c0caf9192) 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间