From 6d0e7f8e36f1d46da772e34d145cd3bbcd4e6c93 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 8 Aug 2020 12:51:36 +0800 Subject: [PATCH 1/9] [dev] support DingTalk alarm servcie #45 --- powerjob-server/pom.xml | 9 ++ .../common/PowerJobServerConfigKey.java | 7 + .../server/common/utils/DingTalkUtils.java | 130 ++++++++++++++++++ .../alarm/impl/DingTalkAlarmService.java | 107 ++++++++++++++ .../MailAlarmService.java} | 8 +- .../resources/application-daily.properties | 5 + .../main/resources/application-pre.properties | 5 + .../resources/application-product.properties | 5 + .../src/main/resources/application.properties | 2 +- .../powerjob/server/test/DingTalkTest.java | 37 +++++ 10 files changed, 311 insertions(+), 4 deletions(-) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/{DefaultMailAlarmService.java => impl/MailAlarmService.java} (87%) create mode 100644 powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DingTalkTest.java diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 835d725b..2c5fce49 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -30,6 +30,7 @@ 3.0.1 3.6 1.2.68 + 1.0.1 true @@ -160,6 +161,14 @@ ${fastjson.version} + + + com.aliyun + alibaba-dingtalk-service-sdk + ${dingding.version} + + + diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java index dabae0dd..f8dcd764 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java @@ -24,4 +24,11 @@ public class PowerJobServerConfigKey { * 是否使用 mongoDB */ public static final String MONGODB_ENABLE = "oms.mongodb.enable"; + + /** + * 钉钉报警相关 + */ + public static final String DING_APP_KEY = "oms.alarm.ding.app-key"; + public static final String DING_APP_SECRET = "oms.alarm.ding.app-secret"; + public static final String DING_AGENT_ID = "oms.alarm.ding.agent-id"; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java new file mode 100644 index 00000000..a89ad17c --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java @@ -0,0 +1,130 @@ +package com.github.kfcfans.powerjob.server.common.utils; + +import com.dingtalk.api.DefaultDingTalkClient; +import com.dingtalk.api.DingTalkClient; +import com.dingtalk.api.request.OapiGettokenRequest; +import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request; +import com.dingtalk.api.request.OapiUserGetByMobileRequest; +import com.dingtalk.api.response.OapiGettokenResponse; +import com.dingtalk.api.response.OapiUserGetByMobileResponse; +import com.github.kfcfans.powerjob.common.OmsException; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.http.HttpMethod; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 钉钉工具类 + * 工作通知消息:https://ding-doc.dingtalk.com/doc#/serverapi2/pgoxpy + * + * @author tjq + * @since 2020/8/8 + */ +@Slf4j +public class DingTalkUtils implements Closeable { + + private String accessToken; + + private final DingTalkClient sendMsgClient; + private final DingTalkClient accessTokenClient; + private final DingTalkClient userIdClient; + private final ScheduledExecutorService scheduledPool; + + private static final long FLUSH_ACCESS_TOKEN_RATE = 6000; + private static final String GET_TOKEN_URL = "https://oapi.dingtalk.com/gettoken"; + private static final String SEND_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"; + private static final String GET_USER_ID_URL = "https://oapi.dingtalk.com/user/get_by_mobile"; + + + public DingTalkUtils(String appKey, String appSecret) { + + this.sendMsgClient = new DefaultDingTalkClient(SEND_URL); + this.accessTokenClient = new DefaultDingTalkClient(GET_TOKEN_URL); + this.userIdClient = new DefaultDingTalkClient(GET_USER_ID_URL); + + refreshAccessToken(appKey, appSecret); + + if (StringUtils.isEmpty(accessToken)) { + throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret"); + } + + scheduledPool = Executors.newSingleThreadScheduledExecutor(); + scheduledPool.scheduleAtFixedRate(() -> refreshAccessToken(appKey, appSecret), FLUSH_ACCESS_TOKEN_RATE, FLUSH_ACCESS_TOKEN_RATE, TimeUnit.SECONDS); + } + + /** + * 获取 AccessToken,AccessToken 是调用其他接口的基础,有效期 7200 秒,需要不断刷新 + * @param appKey 应用 appKey + * @param appSecret 应用 appSecret + */ + private void refreshAccessToken(String appKey, String appSecret) { + try { + OapiGettokenRequest req = new OapiGettokenRequest(); + req.setAppkey(appKey); + req.setAppsecret(appSecret); + req.setHttpMethod(HttpMethod.GET.name()); + OapiGettokenResponse rsp = accessTokenClient.execute(req); + + if (rsp.isSuccess()) { + accessToken = rsp.getAccessToken(); + }else { + log.warn("[DingTalkUtils] flush accessToken failed with req({}),code={},msg={}.", req.getTextParams(), rsp.getErrcode(), rsp.getErrmsg()); + } + } catch (Exception e) { + log.warn("[DingTalkUtils] flush accessToken failed.", e); + } + } + + public String fetchUserIdByMobile(String mobile) throws Exception { + OapiUserGetByMobileRequest request = new OapiUserGetByMobileRequest(); + request.setMobile(mobile); + + OapiUserGetByMobileResponse execute = userIdClient.execute(request, accessToken); + if (execute.isSuccess()) { + return execute.getUserid(); + } + throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); + } + + public void sendMarkdownAsync(String title, List entities, String userList, Long agentId) throws Exception { + OapiMessageCorpconversationAsyncsendV2Request request = new OapiMessageCorpconversationAsyncsendV2Request(); + request.setUseridList(userList); + request.setAgentId(agentId); + request.setToAllUser(false); + + OapiMessageCorpconversationAsyncsendV2Request.Msg msg = new OapiMessageCorpconversationAsyncsendV2Request.Msg(); + + StringBuilder mdBuilder=new StringBuilder(); + mdBuilder.append("## ").append(title).append("\n"); + for (MarkdownEntity entity:entities){ + mdBuilder.append("#### ").append(entity.title).append("\n"); + mdBuilder.append("> ").append(entity.detail).append("\n\n"); + } + + msg.setMsgtype("markdown"); + msg.setMarkdown(new OapiMessageCorpconversationAsyncsendV2Request.Markdown()); + msg.getMarkdown().setTitle(title); + msg.getMarkdown().setText(mdBuilder.toString()); + request.setMsg(msg); + + sendMsgClient.execute(request, accessToken); + } + + @Override + public void close() throws IOException { + scheduledPool.shutdownNow(); + } + + @AllArgsConstructor + public static final class MarkdownEntity { + private String title; + private String detail; + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java new file mode 100644 index 00000000..b40196ad --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java @@ -0,0 +1,107 @@ +package com.github.kfcfans.powerjob.server.service.alarm.impl; + +import com.github.kfcfans.powerjob.common.OmsException; +import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; +import com.github.kfcfans.powerjob.server.common.SJ; +import com.github.kfcfans.powerjob.server.common.utils.DingTalkUtils; +import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; +import com.github.kfcfans.powerjob.server.service.alarm.Alarm; +import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.List; +import java.util.Set; + +/** + * 钉钉告警服务 + * + * @author tjq + * @since 2020/8/6 + */ +@Slf4j +@Service("dingTalkAlarmService") +public class DingTalkAlarmService implements Alarmable { + + @Resource + private Environment environment; + + private Long agentId; + private DingTalkUtils dingTalkUtils; + private Cache mobile2UserIdCache; + + private static final int CACHE_SIZE = 8192; + // 防止缓存击穿 + private static final String EMPTY_TAG = "EMPTY"; + + @Override + public void onFailed(Alarm alarm, List targetUserList) { + if (dingTalkUtils == null) { + return; + } + Set userIds = Sets.newHashSet(); + targetUserList.forEach(user -> { + try { + String userId = mobile2UserIdCache.get(user.getPhone(), () -> { + try { + return dingTalkUtils.fetchUserIdByMobile(user.getPhone()); + } catch (OmsException ignore) { + return EMPTY_TAG; + } catch (Exception ignore) { + return null; + } + }); + if (!EMPTY_TAG.equals(userId)) { + userIds .add(userId); + } + }catch (Exception ignore) { + } + }); + userIds.remove(null); + + if (!userIds.isEmpty()) { + String userListStr = SJ.commaJoiner.skipNulls().join(userIds); + List markdownEntities = Lists.newLinkedList(); + markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost())); + markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", alarm.fetchContent())); + + try { + dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId); + }catch (Exception e) { + log.error("[DingTalkAlarmService] send ding message failed, msg is {}", e.getMessage()); + } + } + } + + @PostConstruct + public void init() { + String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID); + String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY); + String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET); + + log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId); + + if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) { + log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable"); + return; + } + if (!StringUtils.isNumeric(agentId)) { + log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId); + return; + } + this.agentId = Long.valueOf(agentId); + dingTalkUtils = new DingTalkUtils(appKey, appSecret); + mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build(); + log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!"); + } + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java similarity index 87% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java index c643ddc0..9a2a0481 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java @@ -1,6 +1,8 @@ -package com.github.kfcfans.powerjob.server.service.alarm; +package com.github.kfcfans.powerjob.server.service.alarm.impl; import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; +import com.github.kfcfans.powerjob.server.service.alarm.Alarm; +import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; @@ -20,8 +22,8 @@ import java.util.List; * @since 2020/4/30 */ @Slf4j -@Service("omsDefaultMailAlarmService") -public class DefaultMailAlarmService implements Alarmable { +@Service("mailAlarmService") +public class MailAlarmService implements Alarmable { @Resource private Environment environment; diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index 6b6cbba5..5eca462b 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -21,6 +21,11 @@ spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true +####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) ####### +oms.alarm.ding.app-key=dingauqwkvxxnqskknfv +oms.alarm.ding.app-secret=XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl +oms.alarm.ding.agent-id=847044348 + ####### 资源清理配置 ####### oms.instanceinfo.retention=1 oms.container.retention.local=1 diff --git a/powerjob-server/src/main/resources/application-pre.properties b/powerjob-server/src/main/resources/application-pre.properties index bbba59e8..04060a2b 100644 --- a/powerjob-server/src/main/resources/application-pre.properties +++ b/powerjob-server/src/main/resources/application-pre.properties @@ -21,6 +21,11 @@ spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true +####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) ####### +oms.alarm.ding.app-key=dingauqwkvxxnqskknfv +oms.alarm.ding.app-secret=XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl +oms.alarm.ding.agent-id=847044348 + ####### 资源清理配置 ####### oms.instanceinfo.retention=3 oms.container.retention.local=3 diff --git a/powerjob-server/src/main/resources/application-product.properties b/powerjob-server/src/main/resources/application-product.properties index 3352aaf2..3dd39cd0 100644 --- a/powerjob-server/src/main/resources/application-product.properties +++ b/powerjob-server/src/main/resources/application-product.properties @@ -21,6 +21,11 @@ spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true +####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) ####### +oms.alarm.ding.app-key=dingauqwkvxxnqskknfv +oms.alarm.ding.app-secret=XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl +oms.alarm.ding.agent-id=847044348 + ####### 资源清理配置 ####### oms.instanceinfo.retention=7 oms.container.retention.local=7 diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties index 6dc9fd8b..94e29fe3 100644 --- a/powerjob-server/src/main/resources/application.properties +++ b/powerjob-server/src/main/resources/application.properties @@ -17,6 +17,6 @@ spring.servlet.multipart.max-request-size=209715200 # akka ActorSystem 服务端口 oms.akka.port=10086 # 报警服务 bean名称 -oms.alarm.bean.names=omsDefaultMailAlarmService +oms.alarm.bean.names=mailAlarmService,dingTalkAlarmService # 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_ ) oms.table-prefix= \ No newline at end of file diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DingTalkTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DingTalkTest.java new file mode 100644 index 00000000..4821b89f --- /dev/null +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/DingTalkTest.java @@ -0,0 +1,37 @@ +package com.github.kfcfans.powerjob.server.test; + +import com.github.kfcfans.powerjob.server.common.utils.DingTalkUtils; +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** + * 测试钉钉消息工具 + * + * @author tjq + * @since 2020/8/8 + */ +public class DingTalkTest { + + private static final Long AGENT_ID = 847044348L; + private static final DingTalkUtils dingTalkUtils = new DingTalkUtils("dingauqwkvxxnqskknfv", "XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl"); + + @Test + public void testFetchUserId() throws Exception { + System.out.println(dingTalkUtils.fetchUserIdByMobile("38353")); + } + + @Test + public void testSendMarkdown() throws Exception { + String userId = "2159453017839770,1234"; + + List mds = Lists.newLinkedList(); + mds.add(new DingTalkUtils.MarkdownEntity("t111","hahahahahahahha1")); + mds.add(new DingTalkUtils.MarkdownEntity("t2222","hahahahahahahha2")); + mds.add(new DingTalkUtils.MarkdownEntity("t3333","hahahahahahahha3")); + + dingTalkUtils.sendMarkdownAsync("PowerJob AlarmService", mds, userId, AGENT_ID); + } + +} From 55864771f5abae7143ce4a8e41394254f4f98e21 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 8 Aug 2020 13:58:56 +0800 Subject: [PATCH 2/9] [dev] rewrite AlarmService & AlarmCenter --- .../common/PowerJobServerConfigKey.java | 4 - .../common/config/ThreadPoolConfig.java | 14 ---- .../server/service/alarm/AlarmCenter.java | 45 +++++++++++ .../server/service/alarm/Alarmable.java | 7 +- .../service/alarm/OmsCenterAlarmService.java | 77 ------------------- .../alarm/impl/DingTalkAlarmService.java | 2 +- .../service/alarm/impl/MailAlarmService.java | 2 +- .../service/instance/InstanceManager.java | 6 +- .../workflow/WorkflowInstanceManager.java | 7 +- .../src/main/resources/application.properties | 2 - 10 files changed, 57 insertions(+), 109 deletions(-) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java index f8dcd764..c31f748f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/PowerJobServerConfigKey.java @@ -12,10 +12,6 @@ public class PowerJobServerConfigKey { * akka 端口号 */ public static final String AKKA_PORT = "oms.akka.port"; - /** - * alarm bean 名称,多值逗号分隔 - */ - public static final String ALARM_BEAN_NAMES = "oms.alarm.bean.names"; /** * 自定义数据库表前缀 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index fa503d5e..dca75fa7 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -13,7 +13,6 @@ import java.util.concurrent.*; /** * 公用线程池配置 * omsTimingPool:用于执行定时任务的线程池 - * omsCommonPool:用于执行普通任务的线程池 * omsBackgroundPool:用于执行后台任务的线程池,这类任务对时间不敏感,慢慢执行细水长流即可 * taskScheduler:用于定时调度的线程池 * @@ -42,19 +41,6 @@ public class ThreadPoolConfig { return executor; } - - @Bean("omsCommonPool") - public Executor taskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); - executor.setQueueCapacity(1024); - executor.setKeepAliveSeconds(60); - executor.setThreadNamePrefix("omsCommonPool-"); - executor.setRejectedExecutionHandler(new LogOnRejected()); - return executor; - } - @Bean("omsBackgroundPool") public Executor initBackgroundPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java new file mode 100644 index 00000000..3e0ee3d8 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java @@ -0,0 +1,45 @@ +package com.github.kfcfans.powerjob.server.service.alarm; + +import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.*; + +/** + * 报警服务 + * + * @author tjq + * @since 2020/4/19 + */ +@Slf4j +public class AlarmCenter { + + private static final ExecutorService POOL; + private static final List BEANS = Lists.newLinkedList(); + + static { + int cores = Runtime.getRuntime().availableProcessors(); + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build(); + POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory); + } + + + public static void alarmFailed(Alarm alarm, List targetUserList) { + POOL.execute(() -> BEANS.forEach(alarmable -> { + try { + alarmable.onFailed(alarm, targetUserList); + }catch (Exception e) { + log.warn("[AlarmCenter] alarm failed.", e); + } + })); + } + + public static void register(Alarmable alarmable) { + BEANS.add(alarmable); + log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", alarmable.getClass().getName(), alarmable); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java index d7fa2f43..715bfd7f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarmable.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service.alarm; import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; +import org.springframework.beans.factory.InitializingBean; import java.util.List; @@ -10,8 +11,12 @@ import java.util.List; * @author tjq * @since 2020/4/19 */ -public interface Alarmable { +public interface Alarmable extends InitializingBean { void onFailed(Alarm alarm, List targetUserList); + @Override + default void afterPropertiesSet() throws Exception { + AlarmCenter.register(this); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java deleted file mode 100644 index 2f89350f..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/OmsCenterAlarmService.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.github.kfcfans.powerjob.server.service.alarm; - -import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; -import com.github.kfcfans.powerjob.server.common.SJ; -import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; -import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.core.env.Environment; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.List; - -/** - * 报警服务 - * - * @author tjq - * @since 2020/4/19 - */ -@Slf4j -@Service("omsCenterAlarmService") -public class OmsCenterAlarmService implements Alarmable { - - @Resource - private Environment environment; - - private List alarmableList; - private volatile boolean initialized = false; - - @Async("omsCommonPool") - @Override - public void onFailed(Alarm alarm, List targetUserList) { - init(); - alarmableList.forEach(alarmable -> { - try { - alarmable.onFailed(alarm, targetUserList); - }catch (Exception e) { - log.warn("[OmsCenterAlarmService] alarm failed.", e); - } - }); - } - - /** - * 初始化 - * 使用 InitializingBean 进行初始化会导致 NPE,因为没办法控制Bean(开发者自己实现的Bean)的加载顺序 - */ - private void init() { - - if (initialized) { - return; - } - synchronized (this) { - if (initialized) { - return; - } - - alarmableList = Lists.newLinkedList(); - String beanNames = environment.getProperty(PowerJobServerConfigKey.ALARM_BEAN_NAMES); - if (StringUtils.isNotEmpty(beanNames)) { - SJ.commaSplitter.split(beanNames).forEach(beanName -> { - try { - Alarmable bean = (Alarmable) SpringUtils.getBean(beanName); - alarmableList.add(bean); - log.info("[OmsCenterAlarmService] load Alarmable for bean: {} successfully.", beanName); - }catch (Exception e) { - log.warn("[OmsCenterAlarmService] initialize Alarmable for bean: {} failed.", beanName, e); - } - }); - } - initialized = true; - } - } - -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java index b40196ad..a9cca5da 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java @@ -29,7 +29,7 @@ import java.util.Set; * @since 2020/8/6 */ @Slf4j -@Service("dingTalkAlarmService") +@Service public class DingTalkAlarmService implements Alarmable { @Resource diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java index 9a2a0481..78ae48cb 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java @@ -22,7 +22,7 @@ import java.util.List; * @since 2020/4/30 */ @Slf4j -@Service("mailAlarmService") +@Service public class MailAlarmService implements Alarmable { @Resource diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index 6f752a3d..48e825f8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.UserService; -import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; +import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarm; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; @@ -38,8 +38,6 @@ public class InstanceManager { private DispatchService dispatchService; @Resource private InstanceLogService instanceLogService; - @Resource(name = "omsCenterAlarmService") - private Alarmable omsCenterAlarmService; @Resource private InstanceMetadataService instanceMetadataService; @Resource @@ -167,7 +165,7 @@ public class InstanceManager { BeanUtils.copyProperties(instanceInfo, content); List userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); - omsCenterAlarmService.onFailed(content, userList); + AlarmCenter.alarmFailed(content, userList); } // 主动移除缓存,减小内存占用 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index 1f11b1f2..d9e7fab2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -19,7 +19,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowIn import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.UserService; -import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; +import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; @@ -62,9 +62,6 @@ public class WorkflowInstanceManager { @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Resource(name = "omsCenterAlarmService") - private Alarmable omsCenterAlarmService; - private final SegmentLock segmentLock = new SegmentLock(16); /** @@ -339,7 +336,7 @@ public class WorkflowInstanceManager { content.setResult(result); List userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds()); - omsCenterAlarmService.onFailed(content, userList); + AlarmCenter.alarmFailed(content, userList); }); }catch (Exception ignore) { } diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties index 94e29fe3..d91d0684 100644 --- a/powerjob-server/src/main/resources/application.properties +++ b/powerjob-server/src/main/resources/application.properties @@ -16,7 +16,5 @@ spring.servlet.multipart.max-request-size=209715200 ###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ###### # akka ActorSystem 服务端口 oms.akka.port=10086 -# 报警服务 bean名称 -oms.alarm.bean.names=mailAlarmService,dingTalkAlarmService # 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_ ) oms.table-prefix= \ No newline at end of file From 756adce633a7944d4abbf80074489ce3e2f5a691 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 8 Aug 2020 17:10:05 +0800 Subject: [PATCH 3/9] [dev] update NetUtils, support to choose preferred network interface now --- .../kfcfans/powerjob/common/PowerJobDKey.java | 14 + .../powerjob/common/utils/NetUtils.java | 572 +++++++----------- .../src/test/java/NetUtilsTest.java | 24 + .../server/common/utils/CronExpression.java | 2 +- .../github/kfcfans/powerjob/UtilsTest.java | 10 - 5 files changed, 242 insertions(+), 380 deletions(-) create mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java create mode 100644 powerjob-common/src/test/java/NetUtilsTest.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java new file mode 100644 index 00000000..3bc2c32d --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java @@ -0,0 +1,14 @@ +package com.github.kfcfans.powerjob.common; + +/** + * 通过 JVM 启动参数传入的配置信息 + * + * + * @author tjq + * @since 2020/8/8 + */ +public class PowerJobDKey { + + public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred"; + +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java index bac13c8c..d26230f9 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java @@ -1,130 +1,195 @@ package com.github.kfcfans.powerjob.common.utils; +/* +Copyright [2020] [PowerJob] +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + +import com.github.kfcfans.powerjob.common.PowerJobDKey; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.*; -import java.util.Enumeration; -import java.util.Optional; -import java.util.concurrent.ThreadLocalRandom; +import java.util.*; import java.util.regex.Pattern; +import static java.util.Collections.emptyList; + /** * IP and Port Helper for RPC * - * @author tjq borrowed from dubbo - * @since 2020/3/16 + * @author from dubbo, optimize by tjq + * @since 2020/8/8 */ @Slf4j -@SuppressWarnings("all") public class NetUtils { - - private static final String ANYHOST_VALUE = "0.0.0.0"; - private static final String LOCALHOST_KEY = "localhost"; + + private static volatile String HOST_ADDRESS; private static final String LOCALHOST_VALUE = "127.0.0.1"; - - // returned port range is [30000, 39999] - private static final int RND_PORT_START = 30000; - private static final int RND_PORT_RANGE = 10000; - - // valid port range is (0, 65535] - private static final int MIN_PORT = 0; - public static final int MAX_PORT = 65535; - - private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$"); - private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$"); - private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$"); - private static volatile InetAddress LOCAL_ADDRESS = null; + private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$"); + private static final String ANYHOST_VALUE = "0.0.0.0"; - private static final String SPLIT_IPV4_CHARECTER = "\\."; - private static final String SPLIT_IPV6_CHARECTER = ":"; + /** + * 获取本机 IP 地址 + * @return 本机 IP 地址 + */ + public static String getLocalHost() { + if (HOST_ADDRESS != null) { + return HOST_ADDRESS; + } - public static int getRandomPort() { - return RND_PORT_START + ThreadLocalRandom.current().nextInt(RND_PORT_RANGE); + InetAddress address = getLocalAddress(); + if (address != null) { + return HOST_ADDRESS = address.getHostAddress(); + } + return LOCALHOST_VALUE; } - public static int getAvailablePort() { - try (ServerSocket ss = new ServerSocket()) { - ss.bind(null); - return ss.getLocalPort(); - } catch (IOException e) { - return getRandomPort(); + /** + * Find first valid IP from local network card + * + * @return first valid local IP + */ + public static InetAddress getLocalAddress() { + if (LOCAL_ADDRESS != null) { + return LOCAL_ADDRESS; } + InetAddress localAddress = getLocalAddress0(); + LOCAL_ADDRESS = localAddress; + return localAddress; + } + private static InetAddress getLocalAddress0() { + InetAddress localAddress = null; + + // @since 2.7.6, choose the {@link NetworkInterface} first + try { + NetworkInterface networkInterface = findNetworkInterface(); + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + Optional addressOp = toValidAddress(addresses.nextElement()); + if (addressOp.isPresent()) { + try { + if (addressOp.get().isReachable(100)) { + return addressOp.get(); + } + } catch (IOException e) { + // ignore + } + } + } + } catch (Throwable e) { + log.warn("[Net] getLocalAddress0 failed.", e); + } + + try { + localAddress = InetAddress.getLocalHost(); + Optional addressOp = toValidAddress(localAddress); + if (addressOp.isPresent()) { + return addressOp.get(); + } + } catch (Throwable e) { + log.warn("[Net] getLocalAddress0 failed.", e); + } + + + return localAddress; } - public static int getAvailablePort(int port) { - if (port <= 0) { - return getAvailablePort(); + /** + * Get the suitable {@link NetworkInterface} + * + * @return If no {@link NetworkInterface} is available , return null + * @since 2.7.6 + */ + public static NetworkInterface findNetworkInterface() { + + List validNetworkInterfaces = emptyList(); + try { + validNetworkInterfaces = getValidNetworkInterfaces(); + } catch (Throwable e) { + log.warn("[Net] findNetworkInterface failed", e); } - for (int i = port; i < MAX_PORT; i++) { - try (ServerSocket ss = new ServerSocket(i)) { - return i; - } catch (IOException e) { - // continue + + NetworkInterface result = null; + + // Try to find the preferred one + for (NetworkInterface networkInterface : validNetworkInterfaces) { + if (isPreferredNetworkInterface(networkInterface)) { + result = networkInterface; + log.info("[Net] use preferred network interface: {}", networkInterface.getDisplayName()); + break; } } - return port; - } - public static boolean isInvalidPort(int port) { - return port <= MIN_PORT || port > MAX_PORT; - } - - public static boolean isValidAddress(String address) { - return ADDRESS_PATTERN.matcher(address).matches(); - } - - public static boolean isLocalHost(String host) { - return host != null - && (LOCAL_IP_PATTERN.matcher(host).matches() - || host.equalsIgnoreCase(LOCALHOST_KEY)); - } - - public static boolean isAnyHost(String host) { - return ANYHOST_VALUE.equals(host); - } - - public static boolean isInvalidLocalHost(String host) { - return host == null - || host.length() == 0 - || host.equalsIgnoreCase(LOCALHOST_KEY) - || host.equals(ANYHOST_VALUE) - || (LOCAL_IP_PATTERN.matcher(host).matches()); - } - - public static boolean isValidLocalHost(String host) { - return !isInvalidLocalHost(host); - } - - public static InetSocketAddress getLocalSocketAddress(String host, int port) { - return isInvalidLocalHost(host) ? - new InetSocketAddress(port) : new InetSocketAddress(host, port); - } - - static boolean isValidV4Address(InetAddress address) { - if (address == null || address.isLoopbackAddress()) { - return false; + if (result == null) { // If not found, try to get the first one + for (NetworkInterface networkInterface : validNetworkInterfaces) { + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + Optional addressOp = toValidAddress(addresses.nextElement()); + if (addressOp.isPresent()) { + try { + if (addressOp.get().isReachable(100)) { + result = networkInterface; + break; + } + } catch (IOException e) { + // ignore + } + } + } + } } - String name = address.getHostAddress(); - boolean result = (name != null - && IP_PATTERN.matcher(name).matches() - && !ANYHOST_VALUE.equals(name) - && !LOCALHOST_VALUE.equals(name)); + + if (result == null) { + result = first(validNetworkInterfaces); + } + return result; } + private static Optional toValidAddress(InetAddress address) { + if (address instanceof Inet6Address) { + Inet6Address v6Address = (Inet6Address) address; + if (isPreferIPV6Address()) { + return Optional.ofNullable(normalizeV6Address(v6Address)); + } + } + if (isValidV4Address(address)) { + return Optional.of(address); + } + return Optional.empty(); + } + /** * Check if an ipv6 address * * @return true if it is reachable */ static boolean isPreferIPV6Address() { - boolean preferIpv6 = Boolean.getBoolean("java.net.preferIPv6Addresses"); - if (!preferIpv6) { + return Boolean.getBoolean("java.net.preferIPv6Addresses"); + } + + static boolean isValidV4Address(InetAddress address) { + if (address == null || address.isLoopbackAddress()) { return false; } - return false; + + String name = address.getHostAddress(); + return (name != null + && IP_PATTERN.matcher(name).matches() + && !ANYHOST_VALUE.equals(name) + && !LOCALHOST_VALUE.equals(name)); } /** @@ -156,299 +221,68 @@ public class NetUtils { } /** - * 获取本机 IP 地址 - * @return 本机IP地址 + * Get the valid {@link NetworkInterface network interfaces} + * + * @return non-null + * @throws SocketException SocketException if an I/O error occurs. + * @since 2.7.6 */ - public static String getLocalHost() { - InetAddress address = getLocalAddress(); - return address == null ? LOCALHOST_VALUE : address.getHostAddress(); - } - - /** - * Find first valid IP from local network card - * @return first valid local IP - */ - public static InetAddress getLocalAddress() { - if (LOCAL_ADDRESS != null) { - return LOCAL_ADDRESS; - } - InetAddress localAddress = getLocalAddress0(); - LOCAL_ADDRESS = localAddress; - return localAddress; - } - - private static Optional toValidAddress(InetAddress address) { - if (address instanceof Inet6Address) { - Inet6Address v6Address = (Inet6Address) address; - if (isPreferIPV6Address()) { - return Optional.ofNullable(normalizeV6Address(v6Address)); - } - } - if (isValidV4Address(address)) { - return Optional.of(address); - } - return Optional.empty(); - } - - private static InetAddress getLocalAddress0() { - InetAddress localAddress = null; - try { - localAddress = InetAddress.getLocalHost(); - Optional addressOp = toValidAddress(localAddress); - if (addressOp.isPresent()) { - return addressOp.get(); - } - } catch (Throwable e) { - log.warn("[Triple]", e); - } - - try { - Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); - if (null == interfaces) { - return localAddress; - } - while (interfaces.hasMoreElements()) { - try { - NetworkInterface network = interfaces.nextElement(); - if (network.isLoopback() || network.isVirtual() || !network.isUp()) { - continue; - } - Enumeration addresses = network.getInetAddresses(); - while (addresses.hasMoreElements()) { - try { - Optional addressOp = toValidAddress(addresses.nextElement()); - if (addressOp.isPresent()) { - try { - if(addressOp.get().isReachable(100)){ - return addressOp.get(); - } - } catch (IOException e) { - // ignore - } - } - } catch (Throwable e) { - log.warn("[Triple]", e); - } - } - } catch (Throwable e) { - log.warn("[Triple]", e); - } - } - } catch (Throwable e) { - log.warn("[Triple]", e); - } - return localAddress; - } - - public static String getHostName(String address) { - try { - int i = address.indexOf(':'); - if (i > -1) { - address = address.substring(0, i); - } - InetAddress inetAddress = InetAddress.getByName(address); - if (inetAddress != null) { - return inetAddress.getHostName(); - } - } catch (Throwable e) { - // ignore - } - return address; - } - - /** - * getIpByHost - * @param hostName hostName - * @return ip address or hostName if UnknownHostException - */ - public static String getIpByHost(String hostName) { - try { - return InetAddress.getByName(hostName).getHostAddress(); - } catch (UnknownHostException e) { - return hostName; - } - } - - public static String toAddressString(InetSocketAddress address) { - return address.getAddress().getHostAddress() + ":" + address.getPort(); - } - - public static InetSocketAddress toAddress(String address) { - int i = address.indexOf(':'); - String host; - int port; - if (i > -1) { - host = address.substring(0, i); - port = Integer.parseInt(address.substring(i + 1)); - } else { - host = address; - port = 0; - } - return new InetSocketAddress(host, port); - } - - public static String toURL(String protocol, String host, int port, String path) { - StringBuilder sb = new StringBuilder(); - sb.append(protocol).append("://"); - sb.append(host).append(':').append(port); - if (path.charAt(0) != '/') { - sb.append('/'); - } - sb.append(path); - return sb.toString(); - } - - public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException { - setInterface(multicastSocket, multicastAddress instanceof Inet6Address); - multicastSocket.setLoopbackMode(false); - multicastSocket.joinGroup(multicastAddress); - } - - public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException { - boolean interfaceSet = false; - Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + private static List getValidNetworkInterfaces() throws SocketException { + List validNetworkInterfaces = new LinkedList<>(); + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); while (interfaces.hasMoreElements()) { - NetworkInterface i = (NetworkInterface) interfaces.nextElement(); - Enumeration addresses = i.getInetAddresses(); - while (addresses.hasMoreElements()) { - InetAddress address = (InetAddress) addresses.nextElement(); - if (preferIpv6 && address instanceof Inet6Address) { - try { - if(address.isReachable(100)){ - multicastSocket.setInterface(address); - interfaceSet = true; - break; - } - } catch (IOException e) { - // ignore - } - } else if (!preferIpv6 && address instanceof Inet4Address) { - try { - if(address.isReachable(100)){ - multicastSocket.setInterface(address); - interfaceSet = true; - break; - } - } catch (IOException e) { - // ignore - } - } - } - if (interfaceSet) { - break; - } - } - } - - - public static boolean matchIpRange(String pattern, String host, int port) throws UnknownHostException { - if (pattern == null || host == null) { - throw new IllegalArgumentException("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host); - } - pattern = pattern.trim(); - if ("*.*.*.*".equals(pattern) || "*".equals(pattern)) { - return true; - } - - InetAddress inetAddress = InetAddress.getByName(host); - boolean isIpv4 = isValidV4Address(inetAddress) ? true : false; - String[] hostAndPort = getPatternHostAndPort(pattern, isIpv4); - if (hostAndPort[1] != null && !hostAndPort[1].equals(String.valueOf(port))) { - return false; - } - pattern = hostAndPort[0]; - - String splitCharacter = SPLIT_IPV4_CHARECTER; - if (!isIpv4) { - splitCharacter = SPLIT_IPV6_CHARECTER; - } - String[] mask = pattern.split(splitCharacter); - //check format of pattern - checkHostPattern(pattern, mask, isIpv4); - - host = inetAddress.getHostAddress(); - - String[] ipAddress = host.split(splitCharacter); - if (pattern.equals(host)) { - return true; - } - // short name condition - if (!ipPatternContainExpression(pattern)) { - InetAddress patternAddress = InetAddress.getByName(pattern); - if (patternAddress.getHostAddress().equals(host)) { - return true; - } else { - return false; - } - } - for (int i = 0; i < mask.length; i++) { - if ("*".equals(mask[i]) || mask[i].equals(ipAddress[i])) { + NetworkInterface networkInterface = interfaces.nextElement(); + if (ignoreNetworkInterface(networkInterface)) { // ignore continue; - } else if (mask[i].contains("-")) { - String[] rangeNumStrs = mask[i].split("-"); - if (rangeNumStrs.length != 2) { - throw new IllegalArgumentException("There is wrong format of ip Address: " + mask[i]); - } - Integer min = getNumOfIpSegment(rangeNumStrs[0], isIpv4); - Integer max = getNumOfIpSegment(rangeNumStrs[1], isIpv4); - Integer ip = getNumOfIpSegment(ipAddress[i], isIpv4); - if (ip < min || ip > max) { - return false; - } - } else if ("0".equals(ipAddress[i]) && ("0".equals(mask[i]) || "00".equals(mask[i]) || "000".equals(mask[i]) || "0000".equals(mask[i]))) { - continue; - } else if (!mask[i].equals(ipAddress[i])) { - return false; } + validNetworkInterfaces.add(networkInterface); } - return true; + return validNetworkInterfaces; } - private static boolean ipPatternContainExpression(String pattern) { - return pattern.contains("*") || pattern.contains("-"); + /** + * @param networkInterface {@link NetworkInterface} + * @return if the specified {@link NetworkInterface} should be ignored, return true + * @throws SocketException SocketException if an I/O error occurs. + * @since 2.7.6 + */ + private static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException { + return networkInterface == null + || networkInterface.isLoopback() + || networkInterface.isVirtual() + || !networkInterface.isUp(); } - private static void checkHostPattern(String pattern, String[] mask, boolean isIpv4) { - if (!isIpv4) { - if (mask.length != 8 && ipPatternContainExpression(pattern)) { - throw new IllegalArgumentException("If you config ip expression that contains '*' or '-', please fill qulified ip pattern like 234e:0:4567:0:0:0:3d:*. "); - } - if (mask.length != 8 && !pattern.contains("::")) { - throw new IllegalArgumentException("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern); - } + /** + * Take the first element from the specified collection + * + * @param values the collection object + * @param the type of element of collection + * @return if found, return the first one, or null + * @since 2.7.6 + */ + public static T first(Collection values) { + if (values == null || values.isEmpty()) { + return null; + } + if (values instanceof List) { + List list = (List) values; + return list.get(0); } else { - if (mask.length != 4) { - throw new IllegalArgumentException("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern); - } + return values.iterator().next(); } } - private static String[] getPatternHostAndPort(String pattern, boolean isIpv4) { - String[] result = new String[2]; - if (pattern.startsWith("[") && pattern.contains("]:")) { - int end = pattern.indexOf("]:"); - result[0] = pattern.substring(1, end); - result[1] = pattern.substring(end + 2); - return result; - } else if (pattern.startsWith("[") && pattern.endsWith("]")) { - result[0] = pattern.substring(1, pattern.length() - 1); - result[1] = null; - return result; - } else if (isIpv4 && pattern.contains(":")) { - int end = pattern.indexOf(":"); - result[0] = pattern.substring(0, end); - result[1] = pattern.substring(end + 1); - return result; - } else { - result[0] = pattern; - return result; - } - } - - private static Integer getNumOfIpSegment(String ipSegment, boolean isIpv4) { - if (isIpv4) { - return Integer.parseInt(ipSegment); - } - return Integer.parseInt(ipSegment, 16); + /** + * Is preferred {@link NetworkInterface} or not + * + * @param networkInterface {@link NetworkInterface} + * @return if the name of the specified {@link NetworkInterface} matches + * the property value from {@link com.github.kfcfans.powerjob.common.PowerJobDKey#PREFERRED_NETWORK_INTERFACE}, return true, + * or false + */ + public static boolean isPreferredNetworkInterface(NetworkInterface networkInterface) { + String preferredNetworkInterface = System.getProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE); + return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface); } } diff --git a/powerjob-common/src/test/java/NetUtilsTest.java b/powerjob-common/src/test/java/NetUtilsTest.java new file mode 100644 index 00000000..a586453e --- /dev/null +++ b/powerjob-common/src/test/java/NetUtilsTest.java @@ -0,0 +1,24 @@ +import com.github.kfcfans.powerjob.common.PowerJobDKey; +import com.github.kfcfans.powerjob.common.utils.NetUtils; +import org.junit.jupiter.api.Test; + +/** + * NetUtilsTest + * + * @author tjq + * @since 2020/8/8 + */ +public class NetUtilsTest { + + @Test + public void testOrigin() { + System.out.println(NetUtils.getLocalHost()); + } + + @Test + public void testPreferredNetworkInterface() { + System.setProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE, "en5"); + System.out.println(NetUtils.getLocalHost()); + } + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java index de59f0f5..a9c16fe3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.server.common.utils; /* -Copyright [2020] [KFCFans] +Copyright [2020] [PowerJob] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java index 8a72fd04..329f0ee5 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java @@ -1,6 +1,5 @@ package com.github.kfcfans.powerjob; -import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils; import org.junit.jupiter.api.Test; @@ -12,19 +11,10 @@ import org.junit.jupiter.api.Test; */ public class UtilsTest { - @Test - public void testNetUtils() { - System.out.println("本机IP:" + NetUtils.getLocalHost()); - System.out.println("端口:" + NetUtils.getAvailablePort(7777)); - } @Test public void testSystemInfoUtils() { System.out.println(SystemInfoUtils.getSystemMetrics()); } - @Test - public void testSerializeUtils() { - - } } From ae18c5fd7c32200a374856bd55934c4a25421425 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 8 Aug 2020 17:27:27 +0800 Subject: [PATCH 4/9] [dev] update NetUtils, support ignore network interface now --- .../kfcfans/powerjob/common/PowerJobDKey.java | 10 ++++++++++ .../powerjob/common/utils/NetUtils.java | 17 ++++++++++++++++ .../src/test/java/NetUtilsTest.java | 6 ++++++ .../github/kfcfans/powerjob/UtilsTest.java | 20 ------------------- 4 files changed, 33 insertions(+), 20 deletions(-) delete mode 100644 powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java index 3bc2c32d..b849b9e6 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java @@ -1,5 +1,7 @@ package com.github.kfcfans.powerjob.common; +import java.net.NetworkInterface; + /** * 通过 JVM 启动参数传入的配置信息 * @@ -9,6 +11,14 @@ package com.github.kfcfans.powerjob.common; */ public class PowerJobDKey { + /** + * The property name for {@link NetworkInterface#getDisplayName() the name of network interface} that the PowerJob application prefers + */ public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred"; + /** + * Java regular expressions for network interfaces that will be ignored. + */ + public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored"; + } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java index d26230f9..8b8c4817 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/NetUtils.java @@ -17,6 +17,7 @@ limitations under the License. import com.github.kfcfans.powerjob.common.PowerJobDKey; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.net.*; @@ -235,6 +236,10 @@ public class NetUtils { if (ignoreNetworkInterface(networkInterface)) { // ignore continue; } + // 根据用户 -D 参数忽略网卡 + if (ignoreInterfaceByConfig(networkInterface.getDisplayName())) { + continue; + } validNetworkInterfaces.add(networkInterface); } return validNetworkInterfaces; @@ -285,4 +290,16 @@ public class NetUtils { String preferredNetworkInterface = System.getProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE); return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface); } + + static boolean ignoreInterfaceByConfig(String interfaceName) { + String regex = System.getProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX); + if (StringUtils.isBlank(regex)) { + return false; + } + if (interfaceName.matches(regex)) { + log.info("[Net] ignore network interface: {} by regex({})", interfaceName, regex); + return true; + } + return false; + } } diff --git a/powerjob-common/src/test/java/NetUtilsTest.java b/powerjob-common/src/test/java/NetUtilsTest.java index a586453e..410c1f06 100644 --- a/powerjob-common/src/test/java/NetUtilsTest.java +++ b/powerjob-common/src/test/java/NetUtilsTest.java @@ -21,4 +21,10 @@ public class NetUtilsTest { System.out.println(NetUtils.getLocalHost()); } + @Test + public void testIgnoredNetworkInterface() { + System.setProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX, "utun.|llw."); + System.out.println(NetUtils.getLocalHost()); + } + } diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java deleted file mode 100644 index 329f0ee5..00000000 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/UtilsTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.github.kfcfans.powerjob; - -import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils; -import org.junit.jupiter.api.Test; - -/** - * 测试工具类 - * - * @author tjq - * @since 2020/3/24 - */ -public class UtilsTest { - - - @Test - public void testSystemInfoUtils() { - System.out.println(SystemInfoUtils.getSystemMetrics()); - } - -} From 5b4591c8b13c069e07461e7bba73742bb206034f Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 8 Aug 2020 23:19:18 +0800 Subject: [PATCH 5/9] [dev] change version to 3.2.3 & optimize MailAlarmService's log --- powerjob-client/pom.xml | 4 ++-- powerjob-common/pom.xml | 2 +- .../com/github/kfcfans/powerjob/common/OmsConstant.java | 3 +++ powerjob-server/pom.xml | 4 ++-- .../github/kfcfans/powerjob/server/service/alarm/Alarm.java | 3 ++- .../server/service/alarm/impl/DingTalkAlarmService.java | 6 ++++-- .../server/service/alarm/impl/MailAlarmService.java | 2 +- powerjob-worker-agent/pom.xml | 4 ++-- powerjob-worker-samples/pom.xml | 4 ++-- powerjob-worker-spring-boot-starter/pom.xml | 4 ++-- powerjob-worker/pom.xml | 4 ++-- 11 files changed, 23 insertions(+), 17 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index d464c9d4..d1e080d8 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-client - 3.2.2 + 3.2.3 jar - 3.2.2 + 3.2.3 5.6.1 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 9aa123c9..8aec6af7 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.2.2 + 3.2.3 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java index 0bbb4b9c..ed1fd621 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java @@ -12,4 +12,7 @@ public class OmsConstant { public static final String TIME_PATTERN_PLUS = "yyyy-MM-dd HH:mm:ss.SSS"; public static final String NONE = "N/A"; + + public static final String COMMA = ","; + public static final String LINE_SEPARATOR = "\r\n"; } diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 2c5fce49..9b2b2378 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.2.2 + 3.2.3 jar 2.9.2 2.2.6.RELEASE - 3.2.2 + 3.2.3 8.0.19 19.7.0.0 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarm.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarm.java index 1187d41e..be669ad9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarm.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/Alarm.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service.alarm; import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.OmsSerializable; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import org.apache.commons.lang3.StringUtils; @@ -29,7 +30,7 @@ public interface Alarm extends OmsSerializable { }catch (Exception ignore) { } } - sb.append(word).append("\n\r"); + sb.append(word).append(OmsConstant.LINE_SEPARATOR); }); return sb.toString(); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java index a9cca5da..eedbf5ac 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/DingTalkAlarmService.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.service.alarm.impl; +import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; @@ -72,12 +73,13 @@ public class DingTalkAlarmService implements Alarmable { String userListStr = SJ.commaJoiner.skipNulls().join(userIds); List markdownEntities = Lists.newLinkedList(); markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost())); - markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", alarm.fetchContent())); + String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA); + markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content)); try { dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId); }catch (Exception e) { - log.error("[DingTalkAlarmService] send ding message failed, msg is {}", e.getMessage()); + log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage()); } } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java index 78ae48cb..ab9a346a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/impl/MailAlarmService.java @@ -49,7 +49,7 @@ public class MailAlarmService implements Alarmable { javaMailSender.send(sm); }catch (Exception e) { - log.error("[OmsMailAlarmService] send mail({}) failed, reason is {}", sm, e.getMessage()); + log.error("[MailAlarmService] send mail failed, reason is {}", e.getMessage()); } } diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 92ccce1c..68ee9f0c 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.2.2 + 3.2.3 jar - 3.2.2-bugfix + 3.2.3 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 2a65a060..80a5d7e2 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.2.2 + 3.2.3 2.2.6.RELEASE - 3.2.2-bugfix + 3.2.3 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 2f6d520a..60a42a73 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.2.2-bugfix + 3.2.3 jar - 3.2.2-bugfix + 3.2.3 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 8c58ae13..e0ac3936 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.2.2-bugfix + 3.2.3 jar 5.2.4.RELEASE - 3.2.2 + 3.2.3 1.4.200 3.4.2 5.6.1 From 44ae8608ff8143dd88ca027cd62956fc2268a545 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 9 Aug 2020 13:40:09 +0800 Subject: [PATCH 6/9] [dev] optimize log & alomost release 3.2.3 --- .../kfcfans/powerjob/server/common/utils/DingTalkUtils.java | 1 + .../kfcfans/powerjob/server/service/alarm/AlarmCenter.java | 3 ++- .../src/main/resources/application-product.properties | 6 +++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java index a89ad17c..366800d6 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/DingTalkUtils.java @@ -90,6 +90,7 @@ public class DingTalkUtils implements Closeable { if (execute.isSuccess()) { return execute.getUserid(); } + log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg()); throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java index 3e0ee3d8..06bd79b3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/AlarmCenter.java @@ -20,11 +20,12 @@ public class AlarmCenter { private static final ExecutorService POOL; private static final List BEANS = Lists.newLinkedList(); + private static final int THREAD_KEEP_ALIVE_TIME_M = 5; static { int cores = Runtime.getRuntime().availableProcessors(); ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build(); - POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory); + POOL = new ThreadPoolExecutor(cores, cores, THREAD_KEEP_ALIVE_TIME_M, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory); } diff --git a/powerjob-server/src/main/resources/application-product.properties b/powerjob-server/src/main/resources/application-product.properties index 3dd39cd0..768aad92 100644 --- a/powerjob-server/src/main/resources/application-product.properties +++ b/powerjob-server/src/main/resources/application-product.properties @@ -22,9 +22,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) ####### -oms.alarm.ding.app-key=dingauqwkvxxnqskknfv -oms.alarm.ding.app-secret=XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl -oms.alarm.ding.agent-id=847044348 +oms.alarm.ding.app-key= +oms.alarm.ding.app-secret= +oms.alarm.ding.agent-id= ####### 资源清理配置 ####### oms.instanceinfo.retention=7 From 05a6016945ec8b9b942595a4a362a53bd77c151d Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 9 Aug 2020 18:13:20 +0800 Subject: [PATCH 7/9] [opt] %logger{50} -> %logger{20} --- powerjob-server/src/main/resources/logback-dev.xml | 2 +- powerjob-server/src/main/resources/logback-product.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/powerjob-server/src/main/resources/logback-dev.xml b/powerjob-server/src/main/resources/logback-dev.xml index 94e07a92..ec9b4011 100644 --- a/powerjob-server/src/main/resources/logback-dev.xml +++ b/powerjob-server/src/main/resources/logback-dev.xml @@ -10,7 +10,7 @@ converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/> + value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{20}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> diff --git a/powerjob-server/src/main/resources/logback-product.xml b/powerjob-server/src/main/resources/logback-product.xml index b14658f3..4fc5ea1e 100644 --- a/powerjob-server/src/main/resources/logback-product.xml +++ b/powerjob-server/src/main/resources/logback-product.xml @@ -16,7 +16,7 @@ 7 - %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n UTF-8 @@ -53,7 +53,7 @@ 7 - %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n UTF-8 true From 76ed28003fa85070689327fb5672d64af25f434c Mon Sep 17 00:00:00 2001 From: songyinyin Date: Sun, 9 Aug 2020 23:04:06 +0800 Subject: [PATCH 8/9] [dev] worker properties change from powerjob.xxx to powerjob.worker.xxx & If serverAddress is not exist, it will not start PowerJob worker --- .../src/main/resources/application.properties | 8 +- .../PowerJobAutoConfiguration.java | 36 ++++- .../autoconfigure/PowerJobProperties.java | 135 ++++++++++++++---- .../spring-configuration-metadata.json | 98 ++++++++++--- 4 files changed, 222 insertions(+), 55 deletions(-) diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties index ef35ebd2..7c3ff3fa 100644 --- a/powerjob-worker-samples/src/main/resources/application.properties +++ b/powerjob-worker-samples/src/main/resources/application.properties @@ -4,10 +4,10 @@ spring.jpa.open-in-view=false ########### powerjob-worker 配置 ########### # akka 工作端口,可选,默认 27777 -powerjob.akka-port=27777 +powerjob.worker.akka-port=27777 # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称 -powerjob.app-name=powerjob-agent-test +powerjob.worker.app-name=powerjob-agent-test # 调度服务器地址,IP:Port 或 域名,多值逗号分隔 -powerjob.server-address=127.0.0.1:7700,127.0.0.1:7701 +powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701 # 持久化方式,可选,默认 disk -powerjob.store-strategy=disk \ No newline at end of file +powerjob.worker.store-strategy=disk \ No newline at end of file diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java index 6f6000c4..31615b54 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java @@ -3,9 +3,12 @@ package com.github.kfcfans.powerjob.worker.autoconfigure; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.OhMyConfig; +import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import java.util.Arrays; @@ -19,32 +22,53 @@ import java.util.List; */ @Configuration @EnableConfigurationProperties(PowerJobProperties.class) +@Conditional(PowerJobAutoConfiguration.PowerJobWorkerCondition.class) public class PowerJobAutoConfiguration { @Bean @ConditionalOnMissingBean public OhMyWorker initPowerJob(PowerJobProperties properties) { + PowerJobProperties.Worker worker = properties.getWorker(); + // 服务器HTTP地址(端口号为 server.port,而不是 ActorSystem port),请勿添加任何前缀(http://) - CommonUtils.requireNonNull(properties.getServerAddress(), "serverAddress can't be empty!"); - List serverAddress = Arrays.asList(properties.getServerAddress().split(",")); + CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty!"); + List serverAddress = Arrays.asList(worker.getServerAddress().split(",")); // 1. 创建配置文件 OhMyConfig config = new OhMyConfig(); // 可以不显式设置,默认值 27777 - config.setPort(properties.getAkkaPort()); + config.setPort(worker.getAkkaPort()); // appName,需要提前在控制台注册,否则启动报错 - config.setAppName(properties.getAppName()); + config.setAppName(worker.getAppName()); config.setServerAddress(serverAddress); // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 // 有大型 Map/MapReduce 需求,可能产生大量子任务(Task)的场景,请使用 DISK,否则妥妥的 OutOfMemory - config.setStoreStrategy(properties.getStoreStrategy()); + config.setStoreStrategy(worker.getStoreStrategy()); // 启动测试模式,true情况下,不再尝试连接 server 并验证appName - config.setEnableTestMode(properties.isEnableTestMode()); + config.setEnableTestMode(worker.isEnableTestMode()); // 2. 创建 Worker 对象,设置配置文件 OhMyWorker ohMyWorker = new OhMyWorker(); ohMyWorker.setConfig(config); return ohMyWorker; } + + static class PowerJobWorkerCondition extends AnyNestedCondition { + + public PowerJobWorkerCondition() { + super(ConfigurationPhase.PARSE_CONFIGURATION); + } + + @Deprecated + @ConditionalOnProperty(prefix = "powerjob", name = "server-address") + static class PowerJobProperty { + + } + + @ConditionalOnProperty(prefix = "powerjob.worker", name = "server-address") + static class PowerJobWorkerProperty { + + } + } } diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java index b8cf30b9..480e9e8d 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java @@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.autoconfigure; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; /** * PowerJob 配置项 @@ -12,33 +14,114 @@ import org.springframework.boot.context.properties.ConfigurationProperties; * @author songyinyin * @since 2020/7/26 16:37 */ -@Data @ConfigurationProperties(prefix = "powerjob") public class PowerJobProperties { + + private final Worker worker = new Worker(); + + public Worker getWorker() { + return worker; + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.app-name") + public String getAppName() { + return getWorker().appName; + } + + @Deprecated + public void setAppName(String appName) { + getWorker().setAppName(appName); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.akka-port") + public int getAkkaPort() { + return getWorker().akkaPort; + } + + @Deprecated + public void setAkkaPort(int akkaPort) { + getWorker().setAkkaPort(akkaPort); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.server-address") + public String getServerAddress() { + return getWorker().serverAddress; + } + + @Deprecated + public void setServerAddress(String serverAddress) { + getWorker().setServerAddress(serverAddress); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.store-strategy") + public StoreStrategy getStoreStrategy() { + return getWorker().storeStrategy; + } + + @Deprecated + public void setStoreStrategy(StoreStrategy storeStrategy) { + getWorker().setStoreStrategy(storeStrategy); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.max-result-length") + public int getMaxResultLength() { + return getWorker().maxResultLength; + } + + @Deprecated + public void setMaxResultLength(int maxResultLength) { + getWorker().setMaxResultLength(maxResultLength); + } + + @Deprecated + @DeprecatedConfigurationProperty(replacement = "powerjob.worker.enable-test-mode") + public boolean isEnableTestMode() { + return getWorker().enableTestMode; + } + + @Deprecated + public void setEnableTestMode(boolean enableTestMode) { + getWorker().setEnableTestMode(enableTestMode); + } + + + /** - * 应用名称,需要提前在控制台注册,否则启动报错 + * 客户端 配置项 */ - private String appName; - /** - * 启动 akka 端口 - */ - private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT; - /** - * 调度服务器地址,ip:port 或 域名,多个用英文逗号分隔 - */ - private String serverAddress; - /** - * 本地持久化方式,默认使用磁盘 - */ - private StoreStrategy storeStrategy = StoreStrategy.DISK; - /** - * 最大返回值长度,超过会被截断 - * {@link ProcessResult}#msg 的最大长度 - */ - private int maxResultLength = 8096; - /** - * 启动测试模式,true情况下,不再尝试连接 server 并验证appName。 - * true -> 用于本地写单元测试调试; false -> 默认值,标准模式 - */ - private boolean enableTestMode = false; + @Setter + @Getter + public static class Worker { + /** + * 应用名称,需要提前在控制台注册,否则启动报错 + */ + private String appName; + /** + * 启动 akka 端口 + */ + private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT; + /** + * 调度服务器地址,ip:port 或 域名,多个用英文逗号分隔 + */ + private String serverAddress; + /** + * 本地持久化方式,默认使用磁盘 + */ + private StoreStrategy storeStrategy = StoreStrategy.DISK; + /** + * 最大返回值长度,超过会被截断 + * {@link ProcessResult}#msg 的最大长度 + */ + private int maxResultLength = 8096; + /** + * 启动测试模式,true情况下,不再尝试连接 server 并验证appName。 + * true -> 用于本地写单元测试调试; false -> 默认值,标准模式 + */ + private boolean enableTestMode = false; + } } diff --git a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json index 1fbedb0a..5fe74cec 100644 --- a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json @@ -4,46 +4,106 @@ "name": "powerjob", "type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + }, + { + "name": "powerjob.worker", + "type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "sourceMethod": "getWorker()" } ], "properties": [ { - "name": "powerjob.app-name", - "type": "java.lang.String", - "description": "应用名称,需要提前在控制台注册,否则启动报错", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + "name": "powerjob.worker.akka-port", + "type": "java.lang.Integer", + "description": "启动 akka 端口", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" }, { - "name": "powerjob.max-result-length", + "name": "powerjob.worker.app-name", + "type": "java.lang.String", + "description": "应用名称,需要提前在控制台注册,否则启动报错", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, + { + "name": "powerjob.worker.enable-test-mode", + "type": "java.lang.Boolean", + "description": "启动测试模式,true情况下,不再尝试连接 server 并验证appName。 true -> 用于本地写单元测试调试; false -> 默认值,标准模式", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker", + "defaultValue": false + }, + { + "name": "powerjob.worker.max-result-length", "type": "java.lang.Integer", "description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker", "defaultValue": 8096 }, + { + "name": "powerjob.worker.server-address", + "type": "java.lang.String", + "description": "调度服务器地址,ip:port 或 域名,多个用英文逗号分隔", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, + { + "name": "powerjob.worker.store-strategy", + "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", + "description": "本地持久化方式,默认使用磁盘", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, { "name": "powerjob.akka-port", "type": "java.lang.Integer", - "description": "启动 akka 端口", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.akka-port" + } }, { - "name": "powerjob.server-address", + "name": "powerjob.app-name", "type": "java.lang.String", - "description": "调度服务器地址,ip:port 或 域名,多值用英文逗号分隔", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" - }, - { - "name": "powerjob.store-strategy", - "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", - "description": "本地持久化方式,默认使用磁盘", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties" + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.app-name" + } }, { "name": "powerjob.enable-test-mode", "type": "java.lang.Boolean", - "description": "启动测试模式,true情况下,不再尝试连接 server 并验证appName。true -> 用于本地写单元测试调试; false -> 默认值,标准模式", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", - "defaultValue": false + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.enable-test-mode" + } + }, + { + "name": "powerjob.max-result-length", + "type": "java.lang.Integer", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.max-result-length" + } + }, + { + "name": "powerjob.server-address", + "type": "java.lang.String", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.server-address" + } + }, + { + "name": "powerjob.store-strategy", + "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties", + "deprecated": true, + "deprecation": { + "replacement": "powerjob.worker.store-strategy" + } } ], "hints": [] From f36c06c26c06632ea86f4e822bd6fcb15f3b4a03 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 9 Aug 2020 23:36:37 +0800 Subject: [PATCH 9/9] [release] v3.2.3 --- .../src/main/resources/application.properties | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties index 7c3ff3fa..17bfa59f 100644 --- a/powerjob-worker-samples/src/main/resources/application.properties +++ b/powerjob-worker-samples/src/main/resources/application.properties @@ -2,7 +2,7 @@ server.port=8081 spring.jpa.open-in-view=false -########### powerjob-worker 配置 ########### +########### powerjob-worker 配置(老配置 powerjob.xxx 即将废弃,请使用 powerjob.worker.xxx) ########### # akka 工作端口,可选,默认 27777 powerjob.worker.akka-port=27777 # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称 @@ -10,4 +10,6 @@ powerjob.worker.app-name=powerjob-agent-test # 调度服务器地址,IP:Port 或 域名,多值逗号分隔 powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701 # 持久化方式,可选,默认 disk -powerjob.worker.store-strategy=disk \ No newline at end of file +powerjob.worker.store-strategy=disk +# 返回值最大长度,默认 8096 +powerjob.worker.max-result-length=4096 \ No newline at end of file