[dev] rewrite AlarmService & AlarmCenter

This commit is contained in:
tjq 2020-08-08 13:58:56 +08:00
parent 6d0e7f8e36
commit 55864771f5
10 changed files with 57 additions and 109 deletions

View File

@ -12,10 +12,6 @@ public class PowerJobServerConfigKey {
* akka 端口号
*/
public static final String AKKA_PORT = "oms.akka.port";
/**
* alarm bean 名称多值逗号分隔
*/
public static final String ALARM_BEAN_NAMES = "oms.alarm.bean.names";
/**
* 自定义数据库表前缀
*/

View File

@ -13,7 +13,6 @@ import java.util.concurrent.*;
/**
* 公用线程池配置
* omsTimingPool用于执行定时任务的线程池
* omsCommonPool用于执行普通任务的线程池
* omsBackgroundPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* taskScheduler用于定时调度的线程池
*
@ -42,19 +41,6 @@ public class ThreadPoolConfig {
return executor;
}
@Bean("omsCommonPool")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsCommonPool-");
executor.setRejectedExecutionHandler(new LogOnRejected());
return executor;
}
@Bean("omsBackgroundPool")
public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

View File

@ -0,0 +1,45 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
public class AlarmCenter {
private static final ExecutorService POOL;
private static final List<Alarmable> BEANS = Lists.newLinkedList();
static {
int cores = Runtime.getRuntime().availableProcessors();
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();
POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory);
}
public static void alarmFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
POOL.execute(() -> BEANS.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[AlarmCenter] alarm failed.", e);
}
}));
}
public static void register(Alarmable alarmable) {
BEANS.add(alarmable);
log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", alarmable.getClass().getName(), alarmable);
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import org.springframework.beans.factory.InitializingBean;
import java.util.List;
@ -10,8 +11,12 @@ import java.util.List;
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
public interface Alarmable extends InitializingBean {
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
@Override
default void afterPropertiesSet() throws Exception {
AlarmCenter.register(this);
}
}

View File

@ -1,77 +0,0 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
@Service("omsCenterAlarmService")
public class OmsCenterAlarmService implements Alarmable {
@Resource
private Environment environment;
private List<Alarmable> alarmableList;
private volatile boolean initialized = false;
@Async("omsCommonPool")
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
init();
alarmableList.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] alarm failed.", e);
}
});
}
/**
* 初始化
* 使用 InitializingBean 进行初始化会导致 NPE因为没办法控制Bean开发者自己实现的Bean的加载顺序
*/
private void init() {
if (initialized) {
return;
}
synchronized (this) {
if (initialized) {
return;
}
alarmableList = Lists.newLinkedList();
String beanNames = environment.getProperty(PowerJobServerConfigKey.ALARM_BEAN_NAMES);
if (StringUtils.isNotEmpty(beanNames)) {
SJ.commaSplitter.split(beanNames).forEach(beanName -> {
try {
Alarmable bean = (Alarmable) SpringUtils.getBean(beanName);
alarmableList.add(bean);
log.info("[OmsCenterAlarmService] load Alarmable for bean: {} successfully.", beanName);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] initialize Alarmable for bean: {} failed.", beanName, e);
}
});
}
initialized = true;
}
}
}

View File

@ -29,7 +29,7 @@ import java.util.Set;
* @since 2020/8/6
*/
@Slf4j
@Service("dingTalkAlarmService")
@Service
public class DingTalkAlarmService implements Alarmable {
@Resource

View File

@ -22,7 +22,7 @@ import java.util.List;
* @since 2020/4/30
*/
@Slf4j
@Service("mailAlarmService")
@Service
public class MailAlarmService implements Alarmable {
@Resource

View File

@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
@ -38,8 +38,6 @@ public class InstanceManager {
private DispatchService dispatchService;
@Resource
private InstanceLogService instanceLogService;
@Resource(name = "omsCenterAlarmService")
private Alarmable omsCenterAlarmService;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
@ -167,7 +165,7 @@ public class InstanceManager {
BeanUtils.copyProperties(instanceInfo, content);
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
omsCenterAlarmService.onFailed(content, userList);
AlarmCenter.alarmFailed(content, userList);
}
// 主动移除缓存减小内存占用

View File

@ -19,7 +19,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
@ -62,9 +62,6 @@ public class WorkflowInstanceManager {
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource(name = "omsCenterAlarmService")
private Alarmable omsCenterAlarmService;
private final SegmentLock segmentLock = new SegmentLock(16);
/**
@ -339,7 +336,7 @@ public class WorkflowInstanceManager {
content.setResult(result);
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
omsCenterAlarmService.onFailed(content, userList);
AlarmCenter.alarmFailed(content, userList);
});
}catch (Exception ignore) {
}

View File

@ -16,7 +16,5 @@ spring.servlet.multipart.max-request-size=209715200
###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ######
# akka ActorSystem 服务端口
oms.akka.port=10086
# 报警服务 bean名称
oms.alarm.bean.names=mailAlarmService,dingTalkAlarmService
# 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_
oms.table-prefix=