mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[dev] redesign Alarmable interface which is more oo
This commit is contained in:
parent
986aa712a0
commit
671df61390
@ -11,7 +11,9 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
* @author tjq
|
* @author tjq
|
||||||
* @since 2020/8/1
|
* @since 2020/8/1
|
||||||
*/
|
*/
|
||||||
public interface AlarmContent extends OmsSerializable {
|
public interface Alarm extends OmsSerializable {
|
||||||
|
|
||||||
|
String fetchTitle();
|
||||||
|
|
||||||
default String fetchContent() {
|
default String fetchContent() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
@ -12,17 +12,6 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public interface Alarmable {
|
public interface Alarmable {
|
||||||
|
|
||||||
/**
|
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
|
||||||
* 任务执行失败报警
|
|
||||||
* @param content 任务实例相关信息
|
|
||||||
* @param targetUserList 目标用户列表
|
|
||||||
*/
|
|
||||||
void onJobInstanceFailed(JobInstanceAlarmContent content, List<UserInfoDO> targetUserList);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 工作流执行失败报警
|
|
||||||
* @param content 工作流实例相关信息
|
|
||||||
* @param targetUserList 目标用户列表
|
|
||||||
*/
|
|
||||||
void onWorkflowInstanceFailed(WorkflowInstanceAlarmContent content, List<UserInfoDO> targetUserList);
|
|
||||||
}
|
}
|
||||||
|
@ -31,26 +31,9 @@ public class DefaultMailAlarmService implements Alarmable {
|
|||||||
private String from;
|
private String from;
|
||||||
private static final String FROM_KEY = "spring.mail.username";
|
private static final String FROM_KEY = "spring.mail.username";
|
||||||
|
|
||||||
private static final String MAIL_TITLE = "PowerJob AlarmService";
|
|
||||||
private static final String JOB_INSTANCE_FAILED_CONTENT_PATTERN = "Job run failed, detail is: %s";
|
|
||||||
private static final String WF_INSTANCE_FAILED_CONTENT_PATTERN = "Workflow run failed, detail is: %s";
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onJobInstanceFailed(JobInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
|
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
|
||||||
String msg = String.format(JOB_INSTANCE_FAILED_CONTENT_PATTERN, content.fetchContent());
|
|
||||||
sendMail(msg, targetUserList);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onWorkflowInstanceFailed(WorkflowInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
|
|
||||||
String msg = String.format(WF_INSTANCE_FAILED_CONTENT_PATTERN, content.fetchContent());
|
|
||||||
sendMail(msg, targetUserList);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMail(String msg, List<UserInfoDO> targetUserList) {
|
|
||||||
|
|
||||||
initFrom();
|
initFrom();
|
||||||
log.debug("[OmsMailAlarmService] msg: {}, to: {}", msg, targetUserList);
|
|
||||||
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
|
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -59,8 +42,8 @@ public class DefaultMailAlarmService implements Alarmable {
|
|||||||
try {
|
try {
|
||||||
sm.setFrom(from);
|
sm.setFrom(from);
|
||||||
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).toArray(String[]::new));
|
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).toArray(String[]::new));
|
||||||
sm.setSubject(MAIL_TITLE);
|
sm.setSubject(alarm.fetchTitle());
|
||||||
sm.setText(msg);
|
sm.setText(alarm.fetchContent());
|
||||||
|
|
||||||
javaMailSender.send(sm);
|
javaMailSender.send(sm);
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
|
@ -9,7 +9,7 @@ import lombok.Data;
|
|||||||
* @since 2020/4/30
|
* @since 2020/4/30
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class JobInstanceAlarmContent implements AlarmContent {
|
public class JobInstanceAlarm implements Alarm {
|
||||||
// 应用ID
|
// 应用ID
|
||||||
private long appId;
|
private long appId;
|
||||||
// 任务ID
|
// 任务ID
|
||||||
@ -43,4 +43,9 @@ public class JobInstanceAlarmContent implements AlarmContent {
|
|||||||
private Long finishedTime;
|
private Long finishedTime;
|
||||||
// TaskTracker地址
|
// TaskTracker地址
|
||||||
private String taskTrackerAddress;
|
private String taskTrackerAddress;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String fetchTitle() {
|
||||||
|
return "PowerJob AlarmService: Job Running Failed";
|
||||||
|
}
|
||||||
}
|
}
|
@ -29,29 +29,14 @@ public class OmsCenterAlarmService implements Alarmable {
|
|||||||
private List<Alarmable> alarmableList;
|
private List<Alarmable> alarmableList;
|
||||||
private volatile boolean initialized = false;
|
private volatile boolean initialized = false;
|
||||||
|
|
||||||
public OmsCenterAlarmService() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Async("omsCommonPool")
|
@Async("omsCommonPool")
|
||||||
@Override
|
@Override
|
||||||
public void onJobInstanceFailed(JobInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
|
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
|
||||||
init();
|
init();
|
||||||
alarmableList.forEach(alarmable -> {
|
alarmableList.forEach(alarmable -> {
|
||||||
try {
|
try {
|
||||||
alarmable.onJobInstanceFailed(content, targetUserList);
|
alarmable.onFailed(alarm, targetUserList);
|
||||||
}catch (Exception e) {
|
|
||||||
log.warn("[OmsCenterAlarmService] alarm failed.", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Async("omsCommonPool")
|
|
||||||
@Override
|
|
||||||
public void onWorkflowInstanceFailed(WorkflowInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
|
|
||||||
init();
|
|
||||||
alarmableList.forEach(alarmable -> {
|
|
||||||
try {
|
|
||||||
alarmable.onWorkflowInstanceFailed(content, targetUserList);
|
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
log.warn("[OmsCenterAlarmService] alarm failed.", e);
|
log.warn("[OmsCenterAlarmService] alarm failed.", e);
|
||||||
}
|
}
|
||||||
@ -86,4 +71,5 @@ public class OmsCenterAlarmService implements Alarmable {
|
|||||||
initialized = true;
|
initialized = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import lombok.Data;
|
|||||||
* @since 2020/6/12
|
* @since 2020/6/12
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class WorkflowInstanceAlarmContent implements AlarmContent {
|
public class WorkflowInstanceAlarm implements Alarm {
|
||||||
|
|
||||||
private String workflowName;
|
private String workflowName;
|
||||||
|
|
||||||
@ -34,4 +34,9 @@ public class WorkflowInstanceAlarmContent implements AlarmContent {
|
|||||||
private Integer timeExpressionType;
|
private Integer timeExpressionType;
|
||||||
// 时间表达式,CRON/NULL/LONG/LONG
|
// 时间表达式,CRON/NULL/LONG/LONG
|
||||||
private String timeExpression;
|
private String timeExpression;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String fetchTitle() {
|
||||||
|
return "PowerJob AlarmService: Workflow Running Failed";
|
||||||
|
}
|
||||||
}
|
}
|
@ -12,7 +12,7 @@ import com.github.kfcfans.powerjob.server.service.DispatchService;
|
|||||||
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
|
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
|
||||||
import com.github.kfcfans.powerjob.server.service.UserService;
|
import com.github.kfcfans.powerjob.server.service.UserService;
|
||||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
||||||
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent;
|
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarm;
|
||||||
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
|
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
|
||||||
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
|
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -162,12 +162,12 @@ public class InstanceManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
|
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
|
||||||
JobInstanceAlarmContent content = new JobInstanceAlarmContent();
|
JobInstanceAlarm content = new JobInstanceAlarm();
|
||||||
BeanUtils.copyProperties(jobInfo, content);
|
BeanUtils.copyProperties(jobInfo, content);
|
||||||
BeanUtils.copyProperties(instanceInfo, content);
|
BeanUtils.copyProperties(instanceInfo, content);
|
||||||
|
|
||||||
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
|
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
|
||||||
omsCenterAlarmService.onJobInstanceFailed(content, userList);
|
omsCenterAlarmService.onFailed(content, userList);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 主动移除缓存,减小内存占用
|
// 主动移除缓存,减小内存占用
|
||||||
|
@ -20,7 +20,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowIn
|
|||||||
import com.github.kfcfans.powerjob.server.service.DispatchService;
|
import com.github.kfcfans.powerjob.server.service.DispatchService;
|
||||||
import com.github.kfcfans.powerjob.server.service.UserService;
|
import com.github.kfcfans.powerjob.server.service.UserService;
|
||||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
||||||
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarmContent;
|
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;
|
||||||
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
|
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
|
||||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
||||||
import com.google.common.collect.LinkedListMultimap;
|
import com.google.common.collect.LinkedListMultimap;
|
||||||
@ -332,14 +332,14 @@ public class WorkflowInstanceManager {
|
|||||||
// 报警
|
// 报警
|
||||||
try {
|
try {
|
||||||
workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> {
|
workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> {
|
||||||
WorkflowInstanceAlarmContent content = new WorkflowInstanceAlarmContent();
|
WorkflowInstanceAlarm content = new WorkflowInstanceAlarm();
|
||||||
|
|
||||||
BeanUtils.copyProperties(wfInfo, content);
|
BeanUtils.copyProperties(wfInfo, content);
|
||||||
BeanUtils.copyProperties(wfInstance, content);
|
BeanUtils.copyProperties(wfInstance, content);
|
||||||
content.setResult(result);
|
content.setResult(result);
|
||||||
|
|
||||||
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
|
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
|
||||||
omsCenterAlarmService.onWorkflowInstanceFailed(content, userList);
|
omsCenterAlarmService.onFailed(content, userList);
|
||||||
});
|
});
|
||||||
}catch (Exception ignore) {
|
}catch (Exception ignore) {
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user