fix the bug of instance would lost it's instancePrams when hava a retry

This commit is contained in:
tjq 2020-04-30 14:12:35 +08:00
parent e405e283ad
commit f951c9f31c
9 changed files with 64 additions and 24 deletions

View File

@ -30,6 +30,8 @@ public class InstanceInfoDO {
private Long appId; private Long appId;
// 任务实例ID // 任务实例ID
private Long instanceId; private Long instanceId;
// 任务实例参数
private String instanceParams;
/** /**
* 任务状态 {@link com.github.kfcfans.common.InstanceStatus} * 任务状态 {@link com.github.kfcfans.common.InstanceStatus}
*/ */

View File

@ -39,14 +39,14 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
@Transactional @Transactional
@Modifying @Modifying
@CanIgnoreReturnValue @CanIgnoreReturnValue
@Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, finished_time = ?5, task_tracker_address = ?6, result = ?7, gmt_modified = now() where instance_id = ?1", nativeQuery = true) @Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, finished_time = ?5, task_tracker_address = ?6, result = ?7, instance_params = ?8, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4TriggerFailed(long instanceId, int status, long runningTimes, long actualTriggerTime, long finishedTime, String taskTrackerAddress, String result); int update4TriggerFailed(long instanceId, int status, long runningTimes, long actualTriggerTime, long finishedTime, String taskTrackerAddress, String result, String instanceParams);
@Transactional @Transactional
@Modifying @Modifying
@CanIgnoreReturnValue @CanIgnoreReturnValue
@Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, task_tracker_address = ?5, gmt_modified = now() where instance_id = ?1", nativeQuery = true) @Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, task_tracker_address = ?5, instance_params = ?6, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4TriggerSucceed(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress); int update4TriggerSucceed(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress, String instanceParams);
@Modifying @Modifying
@Transactional @Transactional

View File

@ -39,8 +39,9 @@ public class DispatchService {
private static final Splitter commaSplitter = Splitter.on(","); private static final Splitter commaSplitter = Splitter.on(",");
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
dispatch(jobInfo, instanceId, currentRunningTimes, null); String instanceParams = instanceInfoRepository.findByInstanceId(instanceId).getInstanceParams();
dispatch(jobInfo, instanceId, currentRunningTimes, instanceParams);
} }
/** /**
@ -52,7 +53,10 @@ public class DispatchService {
*/ */
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams) { public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams) {
Long jobId = jobInfo.getId(); Long jobId = jobInfo.getId();
log.info("[DispatchService] start to dispatch job: {}.", jobInfo); log.info("[DispatchService] start to dispatch job: {};instancePrams: {}.", jobInfo, instanceParams);
String dbInstanceParams = instanceParams == null ? "" : instanceParams;
// 查询当前运行的实例数 // 查询当前运行的实例数
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus); long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
@ -61,7 +65,7 @@ public class DispatchService {
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams);
return; return;
} }
@ -84,7 +88,7 @@ public class DispatchService {
if (CollectionUtils.isEmpty(finalWorkers)) { if (CollectionUtils.isEmpty(finalWorkers)) {
String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams);
return; return;
} }
@ -103,7 +107,12 @@ public class DispatchService {
BeanUtils.copyProperties(jobInfo, req); BeanUtils.copyProperties(jobInfo, req);
// 传入 JobId // 传入 JobId
req.setJobId(jobInfo.getId()); req.setJobId(jobInfo.getId());
req.setInstanceParams(instanceParams); // 传入 InstanceParams
if (StringUtils.isEmpty(instanceParams)) {
req.setInstanceParams(null);
}else {
req.setInstanceParams(instanceParams);
}
req.setInstanceId(instanceId); req.setInstanceId(instanceId);
req.setAllWorkerAddress(finalWorkers); req.setAllWorkerAddress(finalWorkers);
@ -122,6 +131,6 @@ public class DispatchService {
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
// 修改状态 // 修改状态
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress); instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams);
} }
} }

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.oms.server.service.alarm;
import lombok.Data;
/**
* 告警对象
*
* @author tjq
* @since 2020/4/30
*/
@Data
public class AlarmContent {
// 应用ID
private long appId;
// 任务ID
private long jobId;
// 任务实例ID
private long instanceId;
// 任务名称
private String jobName;
}

View File

@ -108,7 +108,6 @@ public class InstanceManager {
updateEntity.setFinishedTime(System.currentTimeMillis()); updateEntity.setFinishedTime(System.currentTimeMillis());
finished = true; finished = true;
log.info("[InstanceManager] instance(instanceId={}) execute succeed.", instanceId);
}else if (newStatus == InstanceStatus.FAILED) { }else if (newStatus == InstanceStatus.FAILED) {
// 当前重试次数 <= 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 = // 当前重试次数 <= 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 =
@ -118,7 +117,7 @@ public class InstanceManager {
// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败 // 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败
HashedWheelTimerHolder.TIMER.schedule(() -> { HashedWheelTimerHolder.TIMER.schedule(() -> {
getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); getDispatchService().redispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes());
}, 10, TimeUnit.SECONDS); }, 10, TimeUnit.SECONDS);
// 修改状态为 等待派发正式开始重试 // 修改状态为 等待派发正式开始重试
@ -136,25 +135,34 @@ public class InstanceManager {
getInstanceInfoRepository().saveAndFlush(updateEntity); getInstanceInfoRepository().saveAndFlush(updateEntity);
if (finished) { if (finished) {
processFinishedInstance(instanceId); processFinishedInstance(instanceId, updateEntity.getStatus());
} }
} }
/** /**
* 收尾完成的任务实例 * 收尾完成的任务实例
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
* @param status 任务实例状态成功/失败/手动停止
*/ */
public static void processFinishedInstance(Long instanceId) { public static void processFinishedInstance(Long instanceId, int status) {
log.info("[InstanceManager] instance(id={}) process finished.", instanceId); InstanceStatus instanceStatus = InstanceStatus.of(status);
log.info("[InstanceManager] instance(id={}) process finished, current status is {}.", instanceId, instanceStatus);
// 清除已完成的实例信息 // 清除已完成的实例信息
instanceId2StatusHolder.remove(instanceId); instanceId2StatusHolder.remove(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO // 这一步也可能导致后面取不到 JobInfoDO
instanceId2JobInfo.remove(instanceId); JobInfoDO jobInfo = instanceId2JobInfo.remove(instanceId);
// 上报日志数据 // 上报日志数据
getInstanceLogService().sync(instanceId); getInstanceLogService().sync(instanceId);
// 告警
if (instanceStatus == InstanceStatus.FAILED) {
InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId);
}
} }
public static JobInfoDO fetchJobInfo(Long instanceId) { public static JobInfoDO fetchJobInfo(Long instanceId) {

View File

@ -64,7 +64,7 @@ public class InstanceService {
instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceInfoRepository.saveAndFlush(instanceInfoDO); instanceInfoRepository.saveAndFlush(instanceInfoDO);
InstanceManager.processFinishedInstance(instanceId); InstanceManager.processFinishedInstance(instanceId, STOPPED.getV());
/* /*
不可靠通知停止 TaskTracker 不可靠通知停止 TaskTracker

View File

@ -89,7 +89,7 @@ public class InstanceStatusCheckService {
// 重新派发(orElseGet用于消除编译器警告...) // 重新派发(orElseGet用于消除编译器警告...)
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0);
}); });
} }
@ -101,7 +101,7 @@ public class InstanceStatusCheckService {
waitingWorkerReceiveInstances.forEach(instance -> { waitingWorkerReceiveInstances.forEach(instance -> {
// 重新派发 // 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0); dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0);
}); });
} }
@ -124,7 +124,7 @@ public class InstanceStatusCheckService {
// CRON API一样失败次数 + 1根据重试配置进行重试 // CRON API一样失败次数 + 1根据重试配置进行重试
if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) { if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) {
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
}else { }else {
updateFailedInstance(instance); updateFailedInstance(instance);
} }
@ -147,6 +147,6 @@ public class InstanceStatusCheckService {
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
instanceInfoRepository.saveAndFlush(instance); instanceInfoRepository.saveAndFlush(instance);
InstanceManager.processFinishedInstance(instance.getInstanceId()); InstanceManager.processFinishedInstance(instance.getInstanceId(), InstanceStatus.FAILED.getV());
} }
} }

View File

@ -153,7 +153,7 @@ public class JobScheduleService {
} }
HashedWheelTimerHolder.TIMER.schedule(() -> { HashedWheelTimerHolder.TIMER.schedule(() -> {
dispatchService.dispatch(jobInfoDO, instanceId, 0); dispatchService.dispatch(jobInfoDO, instanceId, 0, null);
}, delay, TimeUnit.MILLISECONDS); }, delay, TimeUnit.MILLISECONDS);
}); });

View File

@ -118,7 +118,7 @@ public abstract class TaskTracker {
/* *************************** 对外方法区 *************************** */ /* *************************** 对外方法区 *************************** */
/** /**
* 更新Task状态 * 更新Task状态
* V1.0.0 -> V1.0.1 锁方案变更 synchronized (taskId.intern()) 修改为分段锁能大大减少内存占用损失的只有理论并发度而已 * V1.0.0 -> V1.0.1e405e283ad7f97b0b4e5d369c7de884c0caf9192 锁方案变更 synchronized (taskId.intern()) 修改为分段锁能大大减少内存占用损失的只有理论并发度而已
* @param taskId task的IDtask为任务实例的执行单位 * @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态 * @param newStatus task的新状态
* @param reportTime 上报时间 * @param reportTime 上报时间