[release] v3.2.3

This commit is contained in:
tjq 2020-08-09 23:40:11 +08:00
commit 79176c4cab
35 changed files with 892 additions and 585 deletions

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.2.2</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.2.2</powerjob.common.version>
<powerjob.common.version>3.2.3</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.2.2</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<properties>

View File

@ -12,4 +12,7 @@ public class OmsConstant {
public static final String TIME_PATTERN_PLUS = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String NONE = "N/A";
public static final String COMMA = ",";
public static final String LINE_SEPARATOR = "\r\n";
}

View File

@ -0,0 +1,24 @@
package com.github.kfcfans.powerjob.common;
import java.net.NetworkInterface;
/**
* 通过 JVM 启动参数传入的配置信息
*
*
* @author tjq
* @since 2020/8/8
*/
public class PowerJobDKey {
/**
* The property name for {@link NetworkInterface#getDisplayName() the name of network interface} that the PowerJob application prefers
*/
public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred";
/**
* Java regular expressions for network interfaces that will be ignored.
*/
public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored";
}

View File

@ -1,130 +1,196 @@
package com.github.kfcfans.powerjob.common.utils;
/*
Copyright [2020] [PowerJob]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import com.github.kfcfans.powerjob.common.PowerJobDKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.net.*;
import java.util.Enumeration;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.*;
import java.util.regex.Pattern;
import static java.util.Collections.emptyList;
/**
* IP and Port Helper for RPC
*
* @author tjq borrowed from dubbo
* @since 2020/3/16
* @author from dubbo, optimize by tjq
* @since 2020/8/8
*/
@Slf4j
@SuppressWarnings("all")
public class NetUtils {
private static final String ANYHOST_VALUE = "0.0.0.0";
private static final String LOCALHOST_KEY = "localhost";
private static volatile String HOST_ADDRESS;
private static final String LOCALHOST_VALUE = "127.0.0.1";
// returned port range is [30000, 39999]
private static final int RND_PORT_START = 30000;
private static final int RND_PORT_RANGE = 10000;
// valid port range is (0, 65535]
private static final int MIN_PORT = 0;
public static final int MAX_PORT = 65535;
private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$");
private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$");
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static volatile InetAddress LOCAL_ADDRESS = null;
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static final String ANYHOST_VALUE = "0.0.0.0";
private static final String SPLIT_IPV4_CHARECTER = "\\.";
private static final String SPLIT_IPV6_CHARECTER = ":";
/**
* 获取本机 IP 地址
* @return 本机 IP 地址
*/
public static String getLocalHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
public static int getRandomPort() {
return RND_PORT_START + ThreadLocalRandom.current().nextInt(RND_PORT_RANGE);
InetAddress address = getLocalAddress();
if (address != null) {
return HOST_ADDRESS = address.getHostAddress();
}
return LOCALHOST_VALUE;
}
public static int getAvailablePort() {
try (ServerSocket ss = new ServerSocket()) {
ss.bind(null);
return ss.getLocalPort();
} catch (IOException e) {
return getRandomPort();
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static InetAddress getLocalAddress0() {
InetAddress localAddress = null;
// @since 2.7.6, choose the {@link NetworkInterface} first
try {
NetworkInterface networkInterface = findNetworkInterface();
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
return addressOp.get();
}
} catch (IOException e) {
// ignore
}
}
}
} catch (Throwable e) {
log.warn("[Net] getLocalAddress0 failed.", e);
}
try {
localAddress = InetAddress.getLocalHost();
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
return addressOp.get();
}
} catch (Throwable e) {
log.warn("[Net] getLocalAddress0 failed.", e);
}
return localAddress;
}
public static int getAvailablePort(int port) {
if (port <= 0) {
return getAvailablePort();
/**
* Get the suitable {@link NetworkInterface}
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
* @since 2.7.6
*/
public static NetworkInterface findNetworkInterface() {
List<NetworkInterface> validNetworkInterfaces = emptyList();
try {
validNetworkInterfaces = getValidNetworkInterfaces();
} catch (Throwable e) {
log.warn("[Net] findNetworkInterface failed", e);
}
for (int i = port; i < MAX_PORT; i++) {
try (ServerSocket ss = new ServerSocket(i)) {
return i;
} catch (IOException e) {
// continue
NetworkInterface result = null;
// Try to find the preferred one
for (NetworkInterface networkInterface : validNetworkInterfaces) {
if (isPreferredNetworkInterface(networkInterface)) {
result = networkInterface;
log.info("[Net] use preferred network interface: {}", networkInterface.getDisplayName());
break;
}
}
return port;
}
public static boolean isInvalidPort(int port) {
return port <= MIN_PORT || port > MAX_PORT;
}
public static boolean isValidAddress(String address) {
return ADDRESS_PATTERN.matcher(address).matches();
}
public static boolean isLocalHost(String host) {
return host != null
&& (LOCAL_IP_PATTERN.matcher(host).matches()
|| host.equalsIgnoreCase(LOCALHOST_KEY));
}
public static boolean isAnyHost(String host) {
return ANYHOST_VALUE.equals(host);
}
public static boolean isInvalidLocalHost(String host) {
return host == null
|| host.length() == 0
|| host.equalsIgnoreCase(LOCALHOST_KEY)
|| host.equals(ANYHOST_VALUE)
|| (LOCAL_IP_PATTERN.matcher(host).matches());
}
public static boolean isValidLocalHost(String host) {
return !isInvalidLocalHost(host);
}
public static InetSocketAddress getLocalSocketAddress(String host, int port) {
return isInvalidLocalHost(host) ?
new InetSocketAddress(port) : new InetSocketAddress(host, port);
}
static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
if (result == null) { // If not found, try to get the first one
for (NetworkInterface networkInterface : validNetworkInterfaces) {
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
result = networkInterface;
break;
}
} catch (IOException e) {
// ignore
}
}
}
}
}
String name = address.getHostAddress();
boolean result = (name != null
&& IP_PATTERN.matcher(name).matches()
&& !ANYHOST_VALUE.equals(name)
&& !LOCALHOST_VALUE.equals(name));
if (result == null) {
result = first(validNetworkInterfaces);
}
return result;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return Optional.ofNullable(normalizeV6Address(v6Address));
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
}
return Optional.empty();
}
/**
* Check if an ipv6 address
*
* @return true if it is reachable
*/
static boolean isPreferIPV6Address() {
boolean preferIpv6 = Boolean.getBoolean("java.net.preferIPv6Addresses");
if (!preferIpv6) {
return Boolean.getBoolean("java.net.preferIPv6Addresses");
}
static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
return false;
String name = address.getHostAddress();
return (name != null
&& IP_PATTERN.matcher(name).matches()
&& !ANYHOST_VALUE.equals(name)
&& !LOCALHOST_VALUE.equals(name));
}
/**
@ -156,299 +222,84 @@ public class NetUtils {
}
/**
* 获取本机 IP 地址
* @return 本机IP地址
* Get the valid {@link NetworkInterface network interfaces}
*
* @return non-null
* @throws SocketException SocketException if an I/O error occurs.
* @since 2.7.6
*/
public static String getLocalHost() {
InetAddress address = getLocalAddress();
return address == null ? LOCALHOST_VALUE : address.getHostAddress();
}
/**
* Find first valid IP from local network card
* @return first valid local IP
*/
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return Optional.ofNullable(normalizeV6Address(v6Address));
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
}
return Optional.empty();
}
private static InetAddress getLocalAddress0() {
InetAddress localAddress = null;
try {
localAddress = InetAddress.getLocalHost();
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
return addressOp.get();
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
if (null == interfaces) {
return localAddress;
}
while (interfaces.hasMoreElements()) {
try {
NetworkInterface network = interfaces.nextElement();
if (network.isLoopback() || network.isVirtual() || !network.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = network.getInetAddresses();
while (addresses.hasMoreElements()) {
try {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if(addressOp.get().isReachable(100)){
return addressOp.get();
}
} catch (IOException e) {
// ignore
}
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
return localAddress;
}
public static String getHostName(String address) {
try {
int i = address.indexOf(':');
if (i > -1) {
address = address.substring(0, i);
}
InetAddress inetAddress = InetAddress.getByName(address);
if (inetAddress != null) {
return inetAddress.getHostName();
}
} catch (Throwable e) {
// ignore
}
return address;
}
/**
* getIpByHost
* @param hostName hostName
* @return ip address or hostName if UnknownHostException
*/
public static String getIpByHost(String hostName) {
try {
return InetAddress.getByName(hostName).getHostAddress();
} catch (UnknownHostException e) {
return hostName;
}
}
public static String toAddressString(InetSocketAddress address) {
return address.getAddress().getHostAddress() + ":" + address.getPort();
}
public static InetSocketAddress toAddress(String address) {
int i = address.indexOf(':');
String host;
int port;
if (i > -1) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = 0;
}
return new InetSocketAddress(host, port);
}
public static String toURL(String protocol, String host, int port, String path) {
StringBuilder sb = new StringBuilder();
sb.append(protocol).append("://");
sb.append(host).append(':').append(port);
if (path.charAt(0) != '/') {
sb.append('/');
}
sb.append(path);
return sb.toString();
}
public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException {
setInterface(multicastSocket, multicastAddress instanceof Inet6Address);
multicastSocket.setLoopbackMode(false);
multicastSocket.joinGroup(multicastAddress);
}
public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException {
boolean interfaceSet = false;
Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
private static List<NetworkInterface> getValidNetworkInterfaces() throws SocketException {
List<NetworkInterface> validNetworkInterfaces = new LinkedList<>();
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface i = (NetworkInterface) interfaces.nextElement();
Enumeration addresses = i.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = (InetAddress) addresses.nextElement();
if (preferIpv6 && address instanceof Inet6Address) {
try {
if(address.isReachable(100)){
multicastSocket.setInterface(address);
interfaceSet = true;
break;
}
} catch (IOException e) {
// ignore
}
} else if (!preferIpv6 && address instanceof Inet4Address) {
try {
if(address.isReachable(100)){
multicastSocket.setInterface(address);
interfaceSet = true;
break;
}
} catch (IOException e) {
// ignore
}
}
NetworkInterface networkInterface = interfaces.nextElement();
if (ignoreNetworkInterface(networkInterface)) { // ignore
continue;
}
if (interfaceSet) {
break;
// 根据用户 -D 参数忽略网卡
if (ignoreInterfaceByConfig(networkInterface.getDisplayName())) {
continue;
}
validNetworkInterfaces.add(networkInterface);
}
return validNetworkInterfaces;
}
/**
* @param networkInterface {@link NetworkInterface}
* @return if the specified {@link NetworkInterface} should be ignored, return <code>true</code>
* @throws SocketException SocketException if an I/O error occurs.
* @since 2.7.6
*/
private static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException {
return networkInterface == null
|| networkInterface.isLoopback()
|| networkInterface.isVirtual()
|| !networkInterface.isUp();
}
/**
* Take the first element from the specified collection
*
* @param values the collection object
* @param <T> the type of element of collection
* @return if found, return the first one, or <code>null</code>
* @since 2.7.6
*/
public static <T> T first(Collection<T> values) {
if (values == null || values.isEmpty()) {
return null;
}
if (values instanceof List) {
List<T> list = (List<T>) values;
return list.get(0);
} else {
return values.iterator().next();
}
}
/**
* Is preferred {@link NetworkInterface} or not
*
* @param networkInterface {@link NetworkInterface}
* @return if the name of the specified {@link NetworkInterface} matches
* the property value from {@link com.github.kfcfans.powerjob.common.PowerJobDKey#PREFERRED_NETWORK_INTERFACE}, return <code>true</code>,
* or <code>false</code>
*/
public static boolean isPreferredNetworkInterface(NetworkInterface networkInterface) {
String preferredNetworkInterface = System.getProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE);
return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface);
}
public static boolean matchIpRange(String pattern, String host, int port) throws UnknownHostException {
if (pattern == null || host == null) {
throw new IllegalArgumentException("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host);
}
pattern = pattern.trim();
if ("*.*.*.*".equals(pattern) || "*".equals(pattern)) {
return true;
}
InetAddress inetAddress = InetAddress.getByName(host);
boolean isIpv4 = isValidV4Address(inetAddress) ? true : false;
String[] hostAndPort = getPatternHostAndPort(pattern, isIpv4);
if (hostAndPort[1] != null && !hostAndPort[1].equals(String.valueOf(port))) {
static boolean ignoreInterfaceByConfig(String interfaceName) {
String regex = System.getProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX);
if (StringUtils.isBlank(regex)) {
return false;
}
pattern = hostAndPort[0];
String splitCharacter = SPLIT_IPV4_CHARECTER;
if (!isIpv4) {
splitCharacter = SPLIT_IPV6_CHARECTER;
}
String[] mask = pattern.split(splitCharacter);
//check format of pattern
checkHostPattern(pattern, mask, isIpv4);
host = inetAddress.getHostAddress();
String[] ipAddress = host.split(splitCharacter);
if (pattern.equals(host)) {
if (interfaceName.matches(regex)) {
log.info("[Net] ignore network interface: {} by regex({})", interfaceName, regex);
return true;
}
// short name condition
if (!ipPatternContainExpression(pattern)) {
InetAddress patternAddress = InetAddress.getByName(pattern);
if (patternAddress.getHostAddress().equals(host)) {
return true;
} else {
return false;
}
}
for (int i = 0; i < mask.length; i++) {
if ("*".equals(mask[i]) || mask[i].equals(ipAddress[i])) {
continue;
} else if (mask[i].contains("-")) {
String[] rangeNumStrs = mask[i].split("-");
if (rangeNumStrs.length != 2) {
throw new IllegalArgumentException("There is wrong format of ip Address: " + mask[i]);
}
Integer min = getNumOfIpSegment(rangeNumStrs[0], isIpv4);
Integer max = getNumOfIpSegment(rangeNumStrs[1], isIpv4);
Integer ip = getNumOfIpSegment(ipAddress[i], isIpv4);
if (ip < min || ip > max) {
return false;
}
} else if ("0".equals(ipAddress[i]) && ("0".equals(mask[i]) || "00".equals(mask[i]) || "000".equals(mask[i]) || "0000".equals(mask[i]))) {
continue;
} else if (!mask[i].equals(ipAddress[i])) {
return false;
}
}
return true;
}
private static boolean ipPatternContainExpression(String pattern) {
return pattern.contains("*") || pattern.contains("-");
}
private static void checkHostPattern(String pattern, String[] mask, boolean isIpv4) {
if (!isIpv4) {
if (mask.length != 8 && ipPatternContainExpression(pattern)) {
throw new IllegalArgumentException("If you config ip expression that contains '*' or '-', please fill qulified ip pattern like 234e:0:4567:0:0:0:3d:*. ");
}
if (mask.length != 8 && !pattern.contains("::")) {
throw new IllegalArgumentException("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern);
}
} else {
if (mask.length != 4) {
throw new IllegalArgumentException("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern);
}
}
}
private static String[] getPatternHostAndPort(String pattern, boolean isIpv4) {
String[] result = new String[2];
if (pattern.startsWith("[") && pattern.contains("]:")) {
int end = pattern.indexOf("]:");
result[0] = pattern.substring(1, end);
result[1] = pattern.substring(end + 2);
return result;
} else if (pattern.startsWith("[") && pattern.endsWith("]")) {
result[0] = pattern.substring(1, pattern.length() - 1);
result[1] = null;
return result;
} else if (isIpv4 && pattern.contains(":")) {
int end = pattern.indexOf(":");
result[0] = pattern.substring(0, end);
result[1] = pattern.substring(end + 1);
return result;
} else {
result[0] = pattern;
return result;
}
}
private static Integer getNumOfIpSegment(String ipSegment, boolean isIpv4) {
if (isIpv4) {
return Integer.parseInt(ipSegment);
}
return Integer.parseInt(ipSegment, 16);
return false;
}
}

View File

@ -0,0 +1,30 @@
import com.github.kfcfans.powerjob.common.PowerJobDKey;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import org.junit.jupiter.api.Test;
/**
* NetUtilsTest
*
* @author tjq
* @since 2020/8/8
*/
public class NetUtilsTest {
@Test
public void testOrigin() {
System.out.println(NetUtils.getLocalHost());
}
@Test
public void testPreferredNetworkInterface() {
System.setProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE, "en5");
System.out.println(NetUtils.getLocalHost());
}
@Test
public void testIgnoredNetworkInterface() {
System.setProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX, "utun.|llw.");
System.out.println(NetUtils.getLocalHost());
}
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.2.2</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.2</powerjob.common.version>
<powerjob.common.version>3.2.3</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
@ -30,6 +30,7 @@
<mvn.invoker.version>3.0.1</mvn.invoker.version>
<commons.net.version>3.6</commons.net.version>
<fastjson.version>1.2.68</fastjson.version>
<dingding.version>1.0.1</dingding.version>
<!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip>
@ -160,6 +161,14 @@
<version>${fastjson.version}</version>
</dependency>
<!-- 钉钉 报警通知 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>${dingding.version}</version>
</dependency>
<!-- swagger2 -->
<dependency>

View File

@ -12,10 +12,6 @@ public class PowerJobServerConfigKey {
* akka 端口号
*/
public static final String AKKA_PORT = "oms.akka.port";
/**
* alarm bean 名称多值逗号分隔
*/
public static final String ALARM_BEAN_NAMES = "oms.alarm.bean.names";
/**
* 自定义数据库表前缀
*/
@ -24,4 +20,11 @@ public class PowerJobServerConfigKey {
* 是否使用 mongoDB
*/
public static final String MONGODB_ENABLE = "oms.mongodb.enable";
/**
* 钉钉报警相关
*/
public static final String DING_APP_KEY = "oms.alarm.ding.app-key";
public static final String DING_APP_SECRET = "oms.alarm.ding.app-secret";
public static final String DING_AGENT_ID = "oms.alarm.ding.agent-id";
}

View File

@ -13,7 +13,6 @@ import java.util.concurrent.*;
/**
* 公用线程池配置
* omsTimingPool用于执行定时任务的线程池
* omsCommonPool用于执行普通任务的线程池
* omsBackgroundPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* taskScheduler用于定时调度的线程池
*
@ -42,19 +41,6 @@ public class ThreadPoolConfig {
return executor;
}
@Bean("omsCommonPool")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsCommonPool-");
executor.setRejectedExecutionHandler(new LogOnRejected());
return executor;
}
@Bean("omsBackgroundPool")
public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.common.utils;
/*
Copyright [2020] [KFCFans]
Copyright [2020] [PowerJob]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -0,0 +1,131 @@
package com.github.kfcfans.powerjob.server.common.utils;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiGettokenRequest;
import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request;
import com.dingtalk.api.request.OapiUserGetByMobileRequest;
import com.dingtalk.api.response.OapiGettokenResponse;
import com.dingtalk.api.response.OapiUserGetByMobileResponse;
import com.github.kfcfans.powerjob.common.OmsException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpMethod;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 钉钉工具类
* 工作通知消息https://ding-doc.dingtalk.com/doc#/serverapi2/pgoxpy
*
* @author tjq
* @since 2020/8/8
*/
@Slf4j
public class DingTalkUtils implements Closeable {
private String accessToken;
private final DingTalkClient sendMsgClient;
private final DingTalkClient accessTokenClient;
private final DingTalkClient userIdClient;
private final ScheduledExecutorService scheduledPool;
private static final long FLUSH_ACCESS_TOKEN_RATE = 6000;
private static final String GET_TOKEN_URL = "https://oapi.dingtalk.com/gettoken";
private static final String SEND_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2";
private static final String GET_USER_ID_URL = "https://oapi.dingtalk.com/user/get_by_mobile";
public DingTalkUtils(String appKey, String appSecret) {
this.sendMsgClient = new DefaultDingTalkClient(SEND_URL);
this.accessTokenClient = new DefaultDingTalkClient(GET_TOKEN_URL);
this.userIdClient = new DefaultDingTalkClient(GET_USER_ID_URL);
refreshAccessToken(appKey, appSecret);
if (StringUtils.isEmpty(accessToken)) {
throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret");
}
scheduledPool = Executors.newSingleThreadScheduledExecutor();
scheduledPool.scheduleAtFixedRate(() -> refreshAccessToken(appKey, appSecret), FLUSH_ACCESS_TOKEN_RATE, FLUSH_ACCESS_TOKEN_RATE, TimeUnit.SECONDS);
}
/**
* 获取 AccessTokenAccessToken 是调用其他接口的基础有效期 7200 需要不断刷新
* @param appKey 应用 appKey
* @param appSecret 应用 appSecret
*/
private void refreshAccessToken(String appKey, String appSecret) {
try {
OapiGettokenRequest req = new OapiGettokenRequest();
req.setAppkey(appKey);
req.setAppsecret(appSecret);
req.setHttpMethod(HttpMethod.GET.name());
OapiGettokenResponse rsp = accessTokenClient.execute(req);
if (rsp.isSuccess()) {
accessToken = rsp.getAccessToken();
}else {
log.warn("[DingTalkUtils] flush accessToken failed with req({}),code={},msg={}.", req.getTextParams(), rsp.getErrcode(), rsp.getErrmsg());
}
} catch (Exception e) {
log.warn("[DingTalkUtils] flush accessToken failed.", e);
}
}
public String fetchUserIdByMobile(String mobile) throws Exception {
OapiUserGetByMobileRequest request = new OapiUserGetByMobileRequest();
request.setMobile(mobile);
OapiUserGetByMobileResponse execute = userIdClient.execute(request, accessToken);
if (execute.isSuccess()) {
return execute.getUserid();
}
log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg());
throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
}
public void sendMarkdownAsync(String title, List<MarkdownEntity> entities, String userList, Long agentId) throws Exception {
OapiMessageCorpconversationAsyncsendV2Request request = new OapiMessageCorpconversationAsyncsendV2Request();
request.setUseridList(userList);
request.setAgentId(agentId);
request.setToAllUser(false);
OapiMessageCorpconversationAsyncsendV2Request.Msg msg = new OapiMessageCorpconversationAsyncsendV2Request.Msg();
StringBuilder mdBuilder=new StringBuilder();
mdBuilder.append("## ").append(title).append("\n");
for (MarkdownEntity entity:entities){
mdBuilder.append("#### ").append(entity.title).append("\n");
mdBuilder.append("> ").append(entity.detail).append("\n\n");
}
msg.setMsgtype("markdown");
msg.setMarkdown(new OapiMessageCorpconversationAsyncsendV2Request.Markdown());
msg.getMarkdown().setTitle(title);
msg.getMarkdown().setText(mdBuilder.toString());
request.setMsg(msg);
sendMsgClient.execute(request, accessToken);
}
@Override
public void close() throws IOException {
scheduledPool.shutdownNow();
}
@AllArgsConstructor
public static final class MarkdownEntity {
private String title;
private String detail;
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import org.apache.commons.lang3.StringUtils;
@ -29,7 +30,7 @@ public interface Alarm extends OmsSerializable {
}catch (Exception ignore) {
}
}
sb.append(word).append("\n\r");
sb.append(word).append(OmsConstant.LINE_SEPARATOR);
});
return sb.toString();
}

View File

@ -0,0 +1,46 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
public class AlarmCenter {
private static final ExecutorService POOL;
private static final List<Alarmable> BEANS = Lists.newLinkedList();
private static final int THREAD_KEEP_ALIVE_TIME_M = 5;
static {
int cores = Runtime.getRuntime().availableProcessors();
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();
POOL = new ThreadPoolExecutor(cores, cores, THREAD_KEEP_ALIVE_TIME_M, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory);
}
public static void alarmFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
POOL.execute(() -> BEANS.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[AlarmCenter] alarm failed.", e);
}
}));
}
public static void register(Alarmable alarmable) {
BEANS.add(alarmable);
log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", alarmable.getClass().getName(), alarmable);
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import org.springframework.beans.factory.InitializingBean;
import java.util.List;
@ -10,8 +11,12 @@ import java.util.List;
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
public interface Alarmable extends InitializingBean {
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
@Override
default void afterPropertiesSet() throws Exception {
AlarmCenter.register(this);
}
}

View File

@ -1,77 +0,0 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
@Service("omsCenterAlarmService")
public class OmsCenterAlarmService implements Alarmable {
@Resource
private Environment environment;
private List<Alarmable> alarmableList;
private volatile boolean initialized = false;
@Async("omsCommonPool")
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
init();
alarmableList.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] alarm failed.", e);
}
});
}
/**
* 初始化
* 使用 InitializingBean 进行初始化会导致 NPE因为没办法控制Bean开发者自己实现的Bean的加载顺序
*/
private void init() {
if (initialized) {
return;
}
synchronized (this) {
if (initialized) {
return;
}
alarmableList = Lists.newLinkedList();
String beanNames = environment.getProperty(PowerJobServerConfigKey.ALARM_BEAN_NAMES);
if (StringUtils.isNotEmpty(beanNames)) {
SJ.commaSplitter.split(beanNames).forEach(beanName -> {
try {
Alarmable bean = (Alarmable) SpringUtils.getBean(beanName);
alarmableList.add(bean);
log.info("[OmsCenterAlarmService] load Alarmable for bean: {} successfully.", beanName);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] initialize Alarmable for bean: {} failed.", beanName, e);
}
});
}
initialized = true;
}
}
}

View File

@ -0,0 +1,109 @@
package com.github.kfcfans.powerjob.server.service.alarm.impl;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.utils.DingTalkUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.github.kfcfans.powerjob.server.service.alarm.Alarm;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
/**
* 钉钉告警服务
*
* @author tjq
* @since 2020/8/6
*/
@Slf4j
@Service
public class DingTalkAlarmService implements Alarmable {
@Resource
private Environment environment;
private Long agentId;
private DingTalkUtils dingTalkUtils;
private Cache<String, String> mobile2UserIdCache;
private static final int CACHE_SIZE = 8192;
// 防止缓存击穿
private static final String EMPTY_TAG = "EMPTY";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (dingTalkUtils == null) {
return;
}
Set<String> userIds = Sets.newHashSet();
targetUserList.forEach(user -> {
try {
String userId = mobile2UserIdCache.get(user.getPhone(), () -> {
try {
return dingTalkUtils.fetchUserIdByMobile(user.getPhone());
} catch (OmsException ignore) {
return EMPTY_TAG;
} catch (Exception ignore) {
return null;
}
});
if (!EMPTY_TAG.equals(userId)) {
userIds .add(userId);
}
}catch (Exception ignore) {
}
});
userIds.remove(null);
if (!userIds.isEmpty()) {
String userListStr = SJ.commaJoiner.skipNulls().join(userIds);
List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
try {
dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
}catch (Exception e) {
log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
}
}
}
@PostConstruct
public void init() {
String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
return;
}
if (!StringUtils.isNumeric(agentId)) {
log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
return;
}
this.agentId = Long.valueOf(agentId);
dingTalkUtils = new DingTalkUtils(appKey, appSecret);
mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
}
}

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.powerjob.server.service.alarm;
package com.github.kfcfans.powerjob.server.service.alarm.impl;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.github.kfcfans.powerjob.server.service.alarm.Alarm;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
@ -20,8 +22,8 @@ import java.util.List;
* @since 2020/4/30
*/
@Slf4j
@Service("omsDefaultMailAlarmService")
public class DefaultMailAlarmService implements Alarmable {
@Service
public class MailAlarmService implements Alarmable {
@Resource
private Environment environment;
@ -47,7 +49,7 @@ public class DefaultMailAlarmService implements Alarmable {
javaMailSender.send(sm);
}catch (Exception e) {
log.error("[OmsMailAlarmService] send mail({}) failed, reason is {}", sm, e.getMessage());
log.error("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
}
}

View File

@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
@ -38,8 +38,6 @@ public class InstanceManager {
private DispatchService dispatchService;
@Resource
private InstanceLogService instanceLogService;
@Resource(name = "omsCenterAlarmService")
private Alarmable omsCenterAlarmService;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
@ -167,7 +165,7 @@ public class InstanceManager {
BeanUtils.copyProperties(instanceInfo, content);
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
omsCenterAlarmService.onFailed(content, userList);
AlarmCenter.alarmFailed(content, userList);
}
// 主动移除缓存减小内存占用

View File

@ -19,7 +19,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
@ -62,9 +62,6 @@ public class WorkflowInstanceManager {
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource(name = "omsCenterAlarmService")
private Alarmable omsCenterAlarmService;
private final SegmentLock segmentLock = new SegmentLock(16);
/**
@ -339,7 +336,7 @@ public class WorkflowInstanceManager {
content.setResult(result);
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
omsCenterAlarmService.onFailed(content, userList);
AlarmCenter.alarmFailed(content, userList);
});
}catch (Exception ignore) {
}

View File

@ -21,6 +21,11 @@ spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) #######
oms.alarm.ding.app-key=dingauqwkvxxnqskknfv
oms.alarm.ding.app-secret=XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl
oms.alarm.ding.agent-id=847044348
####### 资源清理配置 #######
oms.instanceinfo.retention=1
oms.container.retention.local=1

View File

@ -21,6 +21,11 @@ spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) #######
oms.alarm.ding.app-key=dingauqwkvxxnqskknfv
oms.alarm.ding.app-secret=XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl
oms.alarm.ding.agent-id=847044348
####### 资源清理配置 #######
oms.instanceinfo.retention=3
oms.container.retention.local=3

View File

@ -21,6 +21,11 @@ spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 钉钉报警配置(不需要钉钉报警可以删除以下配置来避免报错) #######
oms.alarm.ding.app-key=
oms.alarm.ding.app-secret=
oms.alarm.ding.agent-id=
####### 资源清理配置 #######
oms.instanceinfo.retention=7
oms.container.retention.local=7

View File

@ -16,7 +16,5 @@ spring.servlet.multipart.max-request-size=209715200
###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ######
# akka ActorSystem 服务端口
oms.akka.port=10086
# 报警服务 bean名称
oms.alarm.bean.names=omsDefaultMailAlarmService
# 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_
oms.table-prefix=

View File

@ -10,7 +10,7 @@
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{20}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!-- Console 输出设置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">

View File

@ -16,7 +16,7 @@
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
@ -53,7 +53,7 @@
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>

View File

@ -0,0 +1,37 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.common.utils.DingTalkUtils;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.util.List;
/**
* 测试钉钉消息工具
*
* @author tjq
* @since 2020/8/8
*/
public class DingTalkTest {
private static final Long AGENT_ID = 847044348L;
private static final DingTalkUtils dingTalkUtils = new DingTalkUtils("dingauqwkvxxnqskknfv", "XWrEPdAZMPgJeFtHuL0LH73LRj-74umF2_0BFcoXMfvnX0pCQvt0rpb1JOJU_HLl");
@Test
public void testFetchUserId() throws Exception {
System.out.println(dingTalkUtils.fetchUserIdByMobile("38353"));
}
@Test
public void testSendMarkdown() throws Exception {
String userId = "2159453017839770,1234";
List<DingTalkUtils.MarkdownEntity> mds = Lists.newLinkedList();
mds.add(new DingTalkUtils.MarkdownEntity("t111","hahahahahahahha1"));
mds.add(new DingTalkUtils.MarkdownEntity("t2222","hahahahahahahha2"));
mds.add(new DingTalkUtils.MarkdownEntity("t3333","hahahahahahahha3"));
dingTalkUtils.sendMarkdownAsync("PowerJob AlarmService", mds, userId, AGENT_ID);
}
}

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.2.2</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.2-bugfix</powerjob.worker.version>
<powerjob.worker.version>3.2.3</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.2.2</version>
<version>3.2.3</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.2.2-bugfix</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.2.3</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -2,12 +2,14 @@ server.port=8081
spring.jpa.open-in-view=false
########### powerjob-worker 配置 ###########
########### powerjob-worker 配置(老配置 powerjob.xxx 即将废弃,请使用 powerjob.worker.xxx ###########
# akka 工作端口,可选,默认 27777
powerjob.akka-port=27777
powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.app-name=powerjob-agent-test
powerjob.worker.app-name=powerjob-agent-test
# 调度服务器地址IP:Port 或 域名,多值逗号分隔
powerjob.server-address=127.0.0.1:7700,127.0.0.1:7701
powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 持久化方式,可选,默认 disk
powerjob.store-strategy=disk
powerjob.worker.store-strategy=disk
# 返回值最大长度,默认 8096
powerjob.worker.max-result-length=4096

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.2.2-bugfix</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.2-bugfix</powerjob.worker.version>
<powerjob.worker.version>3.2.3</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -3,9 +3,12 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
@ -19,32 +22,53 @@ import java.util.List;
*/
@Configuration
@EnableConfigurationProperties(PowerJobProperties.class)
@Conditional(PowerJobAutoConfiguration.PowerJobWorkerCondition.class)
public class PowerJobAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public OhMyWorker initPowerJob(PowerJobProperties properties) {
PowerJobProperties.Worker worker = properties.getWorker();
// 服务器HTTP地址端口号为 server.port而不是 ActorSystem port请勿添加任何前缀http://
CommonUtils.requireNonNull(properties.getServerAddress(), "serverAddress can't be empty!");
List<String> serverAddress = Arrays.asList(properties.getServerAddress().split(","));
CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty!");
List<String> serverAddress = Arrays.asList(worker.getServerAddress().split(","));
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
// 可以不显式设置默认值 27777
config.setPort(properties.getAkkaPort());
config.setPort(worker.getAkkaPort());
// appName需要提前在控制台注册否则启动报错
config.setAppName(properties.getAppName());
config.setAppName(worker.getAppName());
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 有大型 Map/MapReduce 需求可能产生大量子任务Task的场景请使用 DISK否则妥妥的 OutOfMemory
config.setStoreStrategy(properties.getStoreStrategy());
config.setStoreStrategy(worker.getStoreStrategy());
// 启动测试模式true情况下不再尝试连接 server 并验证appName
config.setEnableTestMode(properties.isEnableTestMode());
config.setEnableTestMode(worker.isEnableTestMode());
// 2. 创建 Worker 对象设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
}
static class PowerJobWorkerCondition extends AnyNestedCondition {
public PowerJobWorkerCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@Deprecated
@ConditionalOnProperty(prefix = "powerjob", name = "server-address")
static class PowerJobProperty {
}
@ConditionalOnProperty(prefix = "powerjob.worker", name = "server-address")
static class PowerJobWorkerProperty {
}
}
}

View File

@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
/**
* PowerJob 配置项
@ -12,33 +14,114 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
* @author songyinyin
* @since 2020/7/26 16:37
*/
@Data
@ConfigurationProperties(prefix = "powerjob")
public class PowerJobProperties {
private final Worker worker = new Worker();
public Worker getWorker() {
return worker;
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.app-name")
public String getAppName() {
return getWorker().appName;
}
@Deprecated
public void setAppName(String appName) {
getWorker().setAppName(appName);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.akka-port")
public int getAkkaPort() {
return getWorker().akkaPort;
}
@Deprecated
public void setAkkaPort(int akkaPort) {
getWorker().setAkkaPort(akkaPort);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.server-address")
public String getServerAddress() {
return getWorker().serverAddress;
}
@Deprecated
public void setServerAddress(String serverAddress) {
getWorker().setServerAddress(serverAddress);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.store-strategy")
public StoreStrategy getStoreStrategy() {
return getWorker().storeStrategy;
}
@Deprecated
public void setStoreStrategy(StoreStrategy storeStrategy) {
getWorker().setStoreStrategy(storeStrategy);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.max-result-length")
public int getMaxResultLength() {
return getWorker().maxResultLength;
}
@Deprecated
public void setMaxResultLength(int maxResultLength) {
getWorker().setMaxResultLength(maxResultLength);
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.enable-test-mode")
public boolean isEnableTestMode() {
return getWorker().enableTestMode;
}
@Deprecated
public void setEnableTestMode(boolean enableTestMode) {
getWorker().setEnableTestMode(enableTestMode);
}
/**
* 应用名称需要提前在控制台注册否则启动报错
* 客户端 配置项
*/
private String appName;
/**
* 启动 akka 端口
*/
private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
/**
* 调度服务器地址ip:port 域名多个用英文逗号分隔
*/
private String serverAddress;
/**
* 本地持久化方式默认使用磁盘
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* 最大返回值长度超过会被截断
* {@link ProcessResult}#msg 的最大长度
*/
private int maxResultLength = 8096;
/**
* 启动测试模式true情况下不再尝试连接 server 并验证appName
* true -> 用于本地写单元测试调试 false -> 默认值标准模式
*/
private boolean enableTestMode = false;
@Setter
@Getter
public static class Worker {
/**
* 应用名称需要提前在控制台注册否则启动报错
*/
private String appName;
/**
* 启动 akka 端口
*/
private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
/**
* 调度服务器地址ip:port 域名多个用英文逗号分隔
*/
private String serverAddress;
/**
* 本地持久化方式默认使用磁盘
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* 最大返回值长度超过会被截断
* {@link ProcessResult}#msg 的最大长度
*/
private int maxResultLength = 8096;
/**
* 启动测试模式true情况下不再尝试连接 server 并验证appName
* true -> 用于本地写单元测试调试 false -> 默认值标准模式
*/
private boolean enableTestMode = false;
}
}

View File

@ -4,46 +4,106 @@
"name": "powerjob",
"type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.worker",
"type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceMethod": "getWorker()"
}
],
"properties": [
{
"name": "powerjob.app-name",
"type": "java.lang.String",
"description": "应用名称,需要提前在控制台注册,否则启动报错",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
"name": "powerjob.worker.akka-port",
"type": "java.lang.Integer",
"description": "启动 akka 端口",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.max-result-length",
"name": "powerjob.worker.app-name",
"type": "java.lang.String",
"description": "应用名称,需要提前在控制台注册,否则启动报错",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.enable-test-mode",
"type": "java.lang.Boolean",
"description": "启动测试模式true情况下不再尝试连接 server 并验证appName。 true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"defaultValue": false
},
{
"name": "powerjob.worker.max-result-length",
"type": "java.lang.Integer",
"description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker",
"defaultValue": 8096
},
{
"name": "powerjob.worker.server-address",
"type": "java.lang.String",
"description": "调度服务器地址ip:port 或 域名,多个用英文逗号分隔",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "本地持久化方式,默认使用磁盘",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.akka-port",
"type": "java.lang.Integer",
"description": "启动 akka 端口",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.akka-port"
}
},
{
"name": "powerjob.server-address",
"name": "powerjob.app-name",
"type": "java.lang.String",
"description": "调度服务器地址ip:port 或 域名,多值用英文逗号分隔",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "本地持久化方式,默认使用磁盘",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.app-name"
}
},
{
"name": "powerjob.enable-test-mode",
"type": "java.lang.Boolean",
"description": "启动测试模式true情况下不再尝试连接 server 并验证appName。true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"defaultValue": false
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.enable-test-mode"
}
},
{
"name": "powerjob.max-result-length",
"type": "java.lang.Integer",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.max-result-length"
}
},
{
"name": "powerjob.server-address",
"type": "java.lang.String",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.server-address"
}
},
{
"name": "powerjob.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.store-strategy"
}
}
],
"hints": []

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.2.2-bugfix</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.2</powerjob.common.version>
<powerjob.common.version>3.2.3</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -1,30 +0,0 @@
package com.github.kfcfans.powerjob;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils;
import org.junit.jupiter.api.Test;
/**
* 测试工具类
*
* @author tjq
* @since 2020/3/24
*/
public class UtilsTest {
@Test
public void testNetUtils() {
System.out.println("本机IP" + NetUtils.getLocalHost());
System.out.println("端口:" + NetUtils.getAvailablePort(7777));
}
@Test
public void testSystemInfoUtils() {
System.out.println(SystemInfoUtils.getSystemMetrics());
}
@Test
public void testSerializeUtils() {
}
}