From 55864771f5abae7143ce4a8e41394254f4f98e21 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 8 Aug 2020 13:58:56 +0800 Subject: [PATCH] [dev] rewrite AlarmService & AlarmCenter --- .../common/PowerJobServerConfigKey.java | 4 - .../common/config/ThreadPoolConfig.java | 14 ---- .../server/service/alarm/AlarmCenter.java | 45 +++++++++++ .../server/service/alarm/Alarmable.java | 7 +- .../service/alarm/OmsCenterAlarmService.java | 77 ------------------- .../alarm/impl/DingTalkAlarmService.java | 2 +- .../service/alarm/impl/MailAlarmService.java | 2 +- .../service/instance/InstanceManager.java | 6 +- .../workflow/WorkflowInstanceManager.java | 7 +- .../src/main/resources/application.properties | 2 - 10 files changed, 57 insertions(+), 109 deletions(-) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java index f8dcd764..c31f748f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java @@ -12,10 +12,6 @@ public class PowerJobServerConfigKey { * akka 端口号 */ public static final String AKKA_PORT = "oms.akka.port"; - /** - * alarm bean 名称,多值逗号分隔 - */ - public static final String ALARM_BEAN_NAMES = "oms.alarm.bean.names"; /** * 自定义数据库表前缀 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index fa503d5e..dca75fa7 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -13,7 +13,6 @@ import java.util.concurrent.*; /** * 公用线程池配置 * omsTimingPool:用于执行定时任务的线程池 - * omsCommonPool:用于执行普通任务的线程池 * omsBackgroundPool:用于执行后台任务的线程池,这类任务对时间不敏感,慢慢执行细水长流即可 * taskScheduler:用于定时调度的线程池 * @@ -42,19 +41,6 @@ public class ThreadPoolConfig { return executor; } - - @Bean("omsCommonPool") - public Executor taskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); - executor.setQueueCapacity(1024); - executor.setKeepAliveSeconds(60); - executor.setThreadNamePrefix("omsCommonPool-"); - executor.setRejectedExecutionHandler(new LogOnRejected()); - return executor; - } - @Bean("omsBackgroundPool") public Executor initBackgroundPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java new file mode 100644 index 00000000..3e0ee3d8 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java @@ -0,0 +1,45 @@ +package com.github.kfcfans.powerjob.server.service.alarm; + +import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.*; + +/** + * 报警服务 + * + * @author tjq + * @since 2020/4/19 + */ +@Slf4j +public class AlarmCenter { + + private static final ExecutorService POOL; + private static final List BEANS = Lists.newLinkedList(); + + static { + int cores = Runtime.getRuntime().availableProcessors(); + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build(); + POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory); + } + + + public static void alarmFailed(Alarm alarm, List targetUserList) { + POOL.execute(() -> BEANS.forEach(alarmable -> { + try { + alarmable.onFailed(alarm, targetUserList); + }catch (Exception e) { + log.warn("[AlarmCenter] alarm failed.", e); + } + })); + } + + public static void register(Alarmable alarmable) { + BEANS.add(alarmable); + log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", alarmable.getClass().getName(), alarmable); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java index d7fa2f43..715bfd7f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service.alarm; import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; +import org.springframework.beans.factory.InitializingBean; import java.util.List; @@ -10,8 +11,12 @@ import java.util.List; * @author tjq * @since 2020/4/19 */ -public interface Alarmable { +public interface Alarmable extends InitializingBean { void onFailed(Alarm alarm, List targetUserList); + @Override + default void afterPropertiesSet() throws Exception { + AlarmCenter.register(this); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java deleted file mode 100644 index 2f89350f..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.github.kfcfans.powerjob.server.service.alarm; - -import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; -import com.github.kfcfans.powerjob.server.common.SJ; -import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; -import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.core.env.Environment; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.List; - -/** - * 报警服务 - * - * @author tjq - * @since 2020/4/19 - */ -@Slf4j -@Service("omsCenterAlarmService") -public class OmsCenterAlarmService implements Alarmable { - - @Resource - private Environment environment; - - private List alarmableList; - private volatile boolean initialized = false; - - @Async("omsCommonPool") - @Override - public void onFailed(Alarm alarm, List targetUserList) { - init(); - alarmableList.forEach(alarmable -> { - try { - alarmable.onFailed(alarm, targetUserList); - }catch (Exception e) { - log.warn("[OmsCenterAlarmService] alarm failed.", e); - } - }); - } - - /** - * 初始化 - * 使用 InitializingBean 进行初始化会导致 NPE,因为没办法控制Bean(开发者自己实现的Bean)的加载顺序 - */ - private void init() { - - if (initialized) { - return; - } - synchronized (this) { - if (initialized) { - return; - } - - alarmableList = Lists.newLinkedList(); - String beanNames = environment.getProperty(PowerJobServerConfigKey.ALARM_BEAN_NAMES); - if (StringUtils.isNotEmpty(beanNames)) { - SJ.commaSplitter.split(beanNames).forEach(beanName -> { - try { - Alarmable bean = (Alarmable) SpringUtils.getBean(beanName); - alarmableList.add(bean); - log.info("[OmsCenterAlarmService] load Alarmable for bean: {} successfully.", beanName); - }catch (Exception e) { - log.warn("[OmsCenterAlarmService] initialize Alarmable for bean: {} failed.", beanName, e); - } - }); - } - initialized = true; - } - } - -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java index b40196ad..a9cca5da 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java @@ -29,7 +29,7 @@ import java.util.Set; * @since 2020/8/6 */ @Slf4j -@Service("dingTalkAlarmService") +@Service public class DingTalkAlarmService implements Alarmable { @Resource diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java index 9a2a0481..78ae48cb 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java @@ -22,7 +22,7 @@ import java.util.List; * @since 2020/4/30 */ @Slf4j -@Service("mailAlarmService") +@Service public class MailAlarmService implements Alarmable { @Resource diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index 6f752a3d..48e825f8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.UserService; -import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; +import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarm; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; @@ -38,8 +38,6 @@ public class InstanceManager { private DispatchService dispatchService; @Resource private InstanceLogService instanceLogService; - @Resource(name = "omsCenterAlarmService") - private Alarmable omsCenterAlarmService; @Resource private InstanceMetadataService instanceMetadataService; @Resource @@ -167,7 +165,7 @@ public class InstanceManager { BeanUtils.copyProperties(instanceInfo, content); List userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); - omsCenterAlarmService.onFailed(content, userList); + AlarmCenter.alarmFailed(content, userList); } // 主动移除缓存,减小内存占用 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index 1f11b1f2..d9e7fab2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -19,7 +19,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowIn import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.UserService; -import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; +import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; @@ -62,9 +62,6 @@ public class WorkflowInstanceManager { @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Resource(name = "omsCenterAlarmService") - private Alarmable omsCenterAlarmService; - private final SegmentLock segmentLock = new SegmentLock(16); /** @@ -339,7 +336,7 @@ public class WorkflowInstanceManager { content.setResult(result); List userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds()); - omsCenterAlarmService.onFailed(content, userList); + AlarmCenter.alarmFailed(content, userList); }); }catch (Exception ignore) { } diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties index 94e29fe3..d91d0684 100644 --- a/powerjob-server/src/main/resources/application.properties +++ b/powerjob-server/src/main/resources/application.properties @@ -16,7 +16,5 @@ spring.servlet.multipart.max-request-size=209715200 ###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ###### # akka ActorSystem 服务端口 oms.akka.port=10086 -# 报警服务 bean名称 -oms.alarm.bean.names=mailAlarmService,dingTalkAlarmService # 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_ ) oms.table-prefix= \ No newline at end of file