feat: support frequent task alarm #370

This commit is contained in:
Echo009 2022-01-21 17:21:10 +08:00
parent d7c0d12a30
commit 8aa5140265
21 changed files with 341 additions and 102 deletions

1
others/update-202201.sql Normal file
View File

@ -0,0 +1 @@
alter table sx_job_info add alarm_config varchar(512) comment '告警配置' default null;

View File

@ -0,0 +1,28 @@
package tech.powerjob.common.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author Echo009
* @since 2022/1/25
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmConfig {
/**
* 触发告警的阈值
*/
private Integer alertThreshold;
/**
* 统计的窗口长度s
*/
private Integer statisticWindowLen;
/**
* 沉默时间窗口s
*/
private Integer silenceWindowLen;
}

View File

@ -15,7 +15,9 @@ import java.util.List;
@Data
public class ServerScheduleJobReq implements PowerSerializable {
// 可用处理器地址可能多值逗号分隔
/**
* 可用处理器地址可能多值逗号分隔
*/
private List<String> allWorkerAddress;
/* *********************** 任务相关属性 *********************** */
@ -32,46 +34,62 @@ public class ServerScheduleJobReq implements PowerSerializable {
private Long instanceId;
/**
* 任务执行处理器信息
* 任务执行类型单机广播MR
*/
// 任务执行类型单机广播MR
private String executeType;
// 处理器类型JavaBeanJar脚本等
/**
* 处理器类型内建外部
*/
private String processorType;
// 处理器信息
/**
* 处理器信息
*/
private String processorInfo;
/**
* 超时时间
* 整个任务的总体超时时间
*/
// 整个任务的总体超时时间
private long instanceTimeoutMS;
/**
* 任务运行参数
* 任务级别的参数相当于类的static变量
*/
// 任务级别的参数相当于类的static变量
private String jobParams;
// 实例级别的参数相当于类的普通变量API触发专用从API触发处带入
/**
* 实例级别的参数相当于类的普通变量API触发专用从API触发处带入
*/
private String instanceParams;
// 每台机器的处理线程数上限
/**
* 每台机器的处理线程数上限
*/
private int threadConcurrency;
// 子任务重试次数任务本身的重试机制由server控制
/**
* 子任务重试次数任务本身的重试机制由server控制
*/
private int taskRetryNum;
/**
* 定时执行信息
* 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
*/
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private String timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG单位MS
/**
* 时间表达式CRON/NULL/LONG/LONG单位MS
*/
private String timeExpression;
// 最大同时运行任务数默认 1
/**
* 最大同时运行任务数默认 1
*/
private Integer maxInstanceNum;
/**
* 告警配置
*/
private String alarmConfig;
@Override
public String path() {
return ProtocolConstant.WORKER_PATH_DISPATCH_JOB;

View File

@ -43,4 +43,13 @@ public class TaskTrackerReportInstanceStatusReq implements PowerSerializable {
private long reportTime;
private String sourceAddress;
/* ********* 秒级任务的告警信息 ********* */
private boolean needAlert;
private String alertContent;
}

View File

@ -4,6 +4,7 @@ import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.utils.CommonUtils;
import lombok.Data;
import tech.powerjob.common.response.JobInfoDTO;
@ -132,6 +133,10 @@ public class SaveJobInfoRequest {
private DispatchStrategy dispatchStrategy;
private String lifecycle;
/**
* alarm config
*/
private AlarmConfig alarmConfig;
/**

View File

@ -1,6 +1,7 @@
package tech.powerjob.common.response;
import lombok.Data;
import tech.powerjob.common.model.AlarmConfig;
import java.util.Date;
@ -16,64 +17,105 @@ public class JobInfoDTO {
private Long id;
/* ************************** 任务基本信息 ************************** */
// 任务名称
/**
* 任务名称
*/
private String jobName;
// 任务描述
/**
* 任务描述
*/
private String jobDescription;
// 任务所属的应用ID
/**
* 任务所属的应用ID
*/
private Long appId;
// 任务自带的参数
/**
* 任务自带的参数
*/
private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
/**
* 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
*/
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
/**
* 时间表达式CRON/NULL/LONG/LONG
*/
private String timeExpression;
/* ************************** 执行方式 ************************** */
// 执行类型单机/广播/MR
/**
* 执行类型单机/广播/MR
*/
private Integer executeType;
// 执行器类型Java/Shell
/**
* 执行器类型Java/Shell
*/
private Integer processorType;
// 执行器信息
/**
* 执行器信息
*/
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数默认 1
/**
* 最大同时运行任务数默认 1
*/
private Integer maxInstanceNum;
// 并发度同时执行某个任务的最大线程数量
/**
* 并发度同时执行某个任务的最大线程数量
*/
private Integer concurrency;
// 任务整体超时时间
/**
* 任务整体超时时间
*/
private Long instanceTimeLimit;
/* ************************** 重试配置 ************************** */
/** ************************** 重试配置 ************************** */
private Integer instanceRetryNum;
private Integer taskRetryNum;
// 1 正常运行2 停止不再调度
/**
* 1 正常运行2 停止不再调度
*/
private Integer status;
// 下一次调度时间
/**
* 下一次调度时间
*/
private Long nextTriggerTime;
/* ************************** 繁忙机器配置 ************************** */
// 最低CPU核心数量0代表不限
/**
* 最低CPU核心数量0代表不限
*/
private double minCpuCores;
// 最低内存空间单位 GB0代表不限
/**
* 最低内存空间单位 GB0代表不限
*/
private double minMemorySpace;
// 最低磁盘空间单位 GB0代表不限
/**
* 最低磁盘空间单位 GB0代表不限
*/
private double minDiskSpace;
/* ************************** 集群配置 ************************** */
// 指定机器运行空代表不限非空则只会使用其中的机器运行多值逗号分割
/**
* 指定机器运行空代表不限非空则只会使用其中的机器运行多值逗号分割
*/
private String designatedWorkers;
// 最大机器数量
/**
* 最大机器数量
*/
private Integer maxWorkerCount;
// 报警用户ID列表多值逗号分隔
/**
* 报警用户ID列表多值逗号分隔
*/
private String notifyUserIds;
private Date gmtCreate;
private Date gmtModified;
private String extra;
@ -81,4 +123,6 @@ public class JobInfoDTO {
private Integer dispatchStrategy;
private String lifecycle;
private AlarmConfig alarmConfig;
}

View File

@ -1,21 +1,22 @@
package tech.powerjob.server.core.instance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.extension.defaultimpl.alarm.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alarm.module.JobInstanceAlarm;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.extension.defaultimpl.alram.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alram.module.JobInstanceAlarm;
import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@ -91,6 +92,11 @@ public class InstanceManager {
instanceInfo.setResult(req.getResult());
instanceInfo.setRunningTimes(req.getTotalTaskNum());
instanceInfoRepository.saveAndFlush(instanceInfo);
// 任务需要告警
if (req.isNeedAlert()) {
log.info("[InstanceManager-{}] receive frequent task alert req,time:{},content:{}",instanceId,req.getReportTime(),req.getAlertContent());
alert(instanceId, req.getAlertContent());
}
return;
}
// 更新运行次数
@ -134,7 +140,9 @@ public class InstanceManager {
if (finished) {
// 这里的 InstanceStatus 只有 成功/失败 两种手动停止不会由 TaskTracker 上报
processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());
}
}
/**
@ -160,28 +168,29 @@ public class InstanceManager {
// 告警
if (status == InstanceStatus.FAILED) {
JobInfoDO jobInfo;
try {
jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
} catch (Exception e) {
log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId);
return;
}
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
JobInstanceAlarm content = new JobInstanceAlarm();
BeanUtils.copyProperties(jobInfo, content);
// 清理数据库后可能导致 instanceInfo 为空进而导致 NPE
if (instanceInfo != null) {
BeanUtils.copyProperties(instanceInfo, content);
}
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
alarmCenter.alarmFailed(content, userList);
alert(instanceId, result);
}
// 主动移除缓存减小内存占用
instanceMetadataService.invalidateJobInfo(instanceId);
}
private void alert(Long instanceId, String alertContent) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
JobInfoDO jobInfo;
try {
jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
} catch (Exception e) {
log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId);
return;
}
JobInstanceAlarm content = new JobInstanceAlarm();
BeanUtils.copyProperties(jobInfo, content);
BeanUtils.copyProperties(instanceInfo, content);
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
if (!StringUtils.isEmpty(alertContent)) {
content.setResult(alertContent);
}
alarmCenter.alarmFailed(content, userList);
}
}

View File

@ -1,9 +1,11 @@
package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.server.common.SJ;
@ -57,7 +59,7 @@ public class JobService {
*
* @param request 任务请求
* @return 创建的任务IDjobId
* @exception ParseException 异常
* @throws ParseException 异常
*/
public Long saveJob(SaveJobInfoRequest request) throws ParseException {
@ -91,6 +93,15 @@ public class JobService {
calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date());
}
// 检查告警配置
if (request.getAlarmConfig() != null) {
AlarmConfig config = request.getAlarmConfig();
if (config.getStatisticWindowLen() == null || config.getAlertThreshold() == null || config.getSilenceWindowLen() == null) {
throw new PowerJobException("illegal alarm config!");
}
jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig()));
}
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
return res.getId();
@ -98,6 +109,7 @@ public class JobService {
/**
* 复制任务
*
* @param jobId 目标任务ID
* @return 复制后的任务 ID
*/
@ -114,7 +126,7 @@ public class JobService {
fillDefaultValue(copyJob);
// 修正创建时间以及更新时间
copyJob.setId(null);
copyJob.setJobName(copyJob.getJobName()+"_COPY");
copyJob.setJobName(copyJob.getJobName() + "_COPY");
copyJob.setGmtCreate(new Date());
copyJob.setGmtModified(new Date());
@ -184,7 +196,7 @@ public class JobService {
* 启用某个任务
*
* @param jobId 任务ID
* @exception ParseException 异常CRON表达式错误
* @throws ParseException 异常CRON表达式错误
*/
public void enableJob(Long jobId) throws ParseException {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
@ -275,6 +287,9 @@ public class JobService {
private static JobInfoDTO convert(JobInfoDO jobInfoDO) {
JobInfoDTO jobInfoDTO = new JobInfoDTO();
BeanUtils.copyProperties(jobInfoDO, jobInfoDTO);
if (jobInfoDO.getAlarmConfig() != null) {
jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class));
}
return jobInfoDTO;
}

View File

@ -24,8 +24,8 @@ import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
import tech.powerjob.server.core.uid.IdGenerateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.extension.defaultimpl.alram.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alram.module.WorkflowInstanceAlarm;
import tech.powerjob.server.extension.defaultimpl.alarm.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alarm.module.WorkflowInstanceAlarm;
import tech.powerjob.server.persistence.remote.model.*;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;

View File

@ -1,7 +1,7 @@
package tech.powerjob.server.extension;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alram.module.Alarm;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import java.util.List;

View File

@ -1,8 +1,8 @@
package tech.powerjob.server.extension.defaultimpl.alram;
package tech.powerjob.server.extension.defaultimpl.alarm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.powerjob.server.extension.defaultimpl.alram.module.Alarm;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import com.google.common.collect.Lists;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alram.impl;
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
@ -6,7 +6,7 @@ import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alram.module.Alarm;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alram.impl;
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;

View File

@ -1,7 +1,7 @@
package tech.powerjob.server.extension.defaultimpl.alram.impl;
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alram.module.Alarm;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -1,10 +1,10 @@
package tech.powerjob.server.extension.defaultimpl.alram.impl;
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alram.module.Alarm;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alram.module;
package tech.powerjob.server.extension.defaultimpl.alarm.module;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alram.module;
package tech.powerjob.server.extension.defaultimpl.alarm.module;
import tech.powerjob.common.model.PEWorkflowDAG;
import lombok.Data;

View File

@ -140,5 +140,9 @@ public class JobInfoDO {
private Integer dispatchStrategy;
private String lifecycle;
/**
* 告警配置
*/
private String alarmConfig;
}

View File

@ -1,6 +1,6 @@
package tech.powerjob.server.test;
import tech.powerjob.server.extension.defaultimpl.alram.impl.DingTalkUtils;
import tech.powerjob.server.extension.defaultimpl.alarm.impl.DingTalkUtils;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;

View File

@ -1,11 +1,13 @@
package tech.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
@ -13,9 +15,11 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -23,10 +27,7 @@ import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.LRUCache;
import tech.powerjob.worker.persistence.TaskDO;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -43,24 +44,40 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class FrequentTaskTracker extends TaskTracker {
// 时间表达式类型
/**
* 时间表达式类型
*/
private TimeExpressionType timeExpressionType;
private long timeParams;
// 最大同时运行实例数
/**
* 最大同时运行实例数
*/
private int maxInstanceNum;
// 总运行次数正常情况不会出现锁竞争直接用 Atomic 系列锁竞争严重推荐 LongAdder
/**
* 总运行次数正常情况不会出现锁竞争直接用 Atomic 系列锁竞争严重推荐 LongAdder
*/
private AtomicLong triggerTimes;
private AtomicLong succeedTimes;
private AtomicLong failedTimes;
// 任务发射器
private AtomicLong succeedTimes;
private AtomicLong failedTimes;
/**
* 任务发射器
*/
private Launcher launcher;
// 保存最近10个子任务的信息供用户查询user -> server -> worker 传递查询
/**
* 保存最近10个子任务的信息供用户查询user -> server -> worker 传递查询
*/
private LRUCache<Long, SubInstanceInfo> recentSubInstanceInfo;
// 保存运行中的任务
/**
* 保存运行中的任务
*/
private Map<Long, SubInstanceTimeHolder> subInstanceId2TimeHolder;
private AlertManager alertManager;
private static final int HISTORY_SIZE = 10;
private static final String LAST_TASK_ID_PREFIX = "L";
private static final int MIN_INTERVAL = 50;
@ -88,7 +105,7 @@ public class FrequentTaskTracker extends TaskTracker {
String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(4, factory);
this.alertManager = constructAlertManager(req);
// 2. 启动任务发射器
launcher = new Launcher();
if (timeExpressionType == TimeExpressionType.FIXED_RATE) {
@ -97,7 +114,7 @@ public class FrequentTaskTracker extends TaskTracker {
throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000");
}
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
}else {
} else {
scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS);
}
@ -203,7 +220,7 @@ public class FrequentTaskTracker extends TaskTracker {
public void run() {
try {
innerRun();
}catch (Exception e) {
} catch (Exception e) {
log.error("[FQTaskTracker-{}] launch task failed.", instanceId, e);
}
}
@ -224,7 +241,7 @@ public class FrequentTaskTracker extends TaskTracker {
try {
checkStatus();
reportStatus();
}catch (Exception e) {
} catch (Exception e) {
log.warn("[FQTaskTracker-{}] check and report status failed.", instanceId, e);
}
}
@ -283,12 +300,12 @@ public class FrequentTaskTracker extends TaskTracker {
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
onFinished(subInstanceId, success, resultTask.getResult(), iterator);
continue;
// MAP 不关心结果最简单
// MAP 不关心结果最简单
case MAP:
String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
onFinished(subInstanceId, holder.failedNum == 0, result, iterator);
continue;
// MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否
// MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否
default:
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, subInstanceId);
if (lastTaskOptional.isPresent()) {
@ -297,7 +314,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
onFinished(subInstanceId, lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS, lastTaskOptional.get().getResult(), iterator);
}
}else {
} else {
// 创建最终任务并提交执行
TaskDO newLastTask = new TaskDO();
@ -333,6 +350,13 @@ public class FrequentTaskTracker extends TaskTracker {
req.setFailedTaskNum(failedTimes.get());
req.setSourceAddress(workerRuntime.getWorkerAddress());
// alert
if (alertManager.alert()) {
req.setNeedAlert(true);
req.setAlertContent(alertManager.getAlertContent());
log.warn("[FQTaskTracker-{}] report alert req,time:{}", instanceId, req.getReportTime());
}
String serverPath = AkkaUtils.getServerActorPath(currentServerAddress);
if (StringUtils.isEmpty(serverPath)) {
return;
@ -352,10 +376,12 @@ public class FrequentTaskTracker extends TaskTracker {
}
private void processFinishedSubInstance(long subInstanceId, boolean success, String result) {
long currentTime = System.currentTimeMillis();
if (success) {
succeedTimes.incrementAndGet();
} else {
failedTimes.incrementAndGet();
alertManager.update(currentTime, result);
}
// 从运行中任务列表移除
@ -366,7 +392,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (subInstanceInfo != null) {
subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV();
subInstanceInfo.result = result;
subInstanceInfo.finishedTime = System.currentTimeMillis();
subInstanceInfo.finishedTime = currentTime;
}
// 删除数据库相关数据
taskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId);
@ -377,6 +403,86 @@ public class FrequentTaskTracker extends TaskTracker {
}
}
private AlertManager constructAlertManager(ServerScheduleJobReq req) {
long rate = Long.parseLong(req.getTimeExpression());
if (!StringUtils.isEmpty(req.getAlarmConfig())) {
try {
log.debug("[FQTaskTracker-{}] alert config:{}", instanceId, req.getAlarmConfig());
AlarmConfig alarmConfig = JsonUtils.parseObject(req.getAlarmConfig(), AlarmConfig.class);
return new AlertManager(alarmConfig);
} catch (JsonProcessingException ignore) {
//
}
}
// 默认配置失败一次就告警沉默窗口 5 分钟
int statisticWindowLen = Math.max((int) (2 * rate / 1000), 1);
return new AlertManager(new AlarmConfig(1, statisticWindowLen, 300));
}
private class AlertManager {
/**
* 记录执行失败的时间
*/
private final LinkedList<Long> failedRecordList = new LinkedList<>();
/**
* 告警配置
*/
private final AlarmConfig config;
/**
* 告警的激活时间
*/
private long alarmActiveTime = 0L;
/**
* 告警的内容记录最后一次失败的任务执行结果
*/
private String content;
/**
* 是否处于激活状态
*/
private boolean active;
public AlertManager(AlarmConfig config) {
log.info("[FQTaskTracker-{}] create alert manager,alertThreshold:{},statisticWindowLen:{} s,silenceWindowLen:{} s", instanceId, config.getAlertThreshold(), config.getStatisticWindowLen(), config.getSilenceWindowLen());
this.config = config;
}
public synchronized void update(long currentTime, String result) {
log.debug("[FQTaskTracker-{}] update alert statistic info,currentTime:{}", instanceId, currentTime);
if (currentTime < alarmActiveTime + config.getSilenceWindowLen() * 1000) {
// 处于沉默窗口内
return;
}
// 当前统计窗口允许的最小值
long minTime = currentTime - config.getStatisticWindowLen() * 1000;
while (!failedRecordList.isEmpty() && failedRecordList.peekFirst() < minTime) {
failedRecordList.removeFirst();
}
failedRecordList.add(currentTime);
if (failedRecordList.size() >= config.getAlertThreshold()) {
// 标记需要告警
active = true;
alarmActiveTime = currentTime;
content = result;
}
}
public synchronized boolean alert() {
if (active) {
active = false;
return true;
}
return false;
}
public String getAlertContent() {
return content;
}
}
@Data
private static class SubInstanceInfo {
private int status;