Merge branch '4.3.7_v2'

This commit is contained in:
tjq 2024-02-09 13:36:21 +08:00
commit 78b58d02e8
63 changed files with 755 additions and 230 deletions

View File

@ -1,5 +1,9 @@
# English | [简体中文](./README_zhCN.md)
<p align="center">
🏮PowerJob 全体成员祝大家龙年腾飞,新的一年身体健康,万事如意,阖家欢乐,幸福安康!🏮
</p>
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>

View File

@ -1,5 +1,9 @@
# [English](./README.md) | 简体中文
<p align="center">
🏮PowerJob 全体成员祝大家龙年腾飞,新的一年身体健康,万事如意,阖家欢乐,幸福安康!🏮
</p>
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>

View File

@ -6,7 +6,7 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>pom</packaging>
<name>powerjob</name>
<url>http://www.powerjob.tech</url>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.9.1</junit.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.common.version>4.3.6</powerjob.common.version>
<powerjob.common.version>4.3.7</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>jar</packaging>
<properties>

View File

@ -0,0 +1,26 @@
package tech.powerjob.common.enhance;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
/**
* 安全的 runnable可防止因抛出异常导致周期性任务终止
* 使用 {@link ScheduledExecutorService} 执行任务时推荐继承此类捕获并打印异常避免因为抛出异常导致周期性任务终止
*
* @author songyinyin
* @since 2023/9/20 15:52
*/
@Slf4j
public abstract class SafeRunnable implements Runnable{
@Override
public void run() {
try {
run0();
} catch (Exception e) {
log.error("[SafeRunnable] run failed", e);
}
}
protected abstract void run0();
}

View File

@ -0,0 +1,30 @@
package tech.powerjob.common.enhance;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
/**
* 使用 {@link ScheduledExecutorService} 执行任务时推荐使用此对象包装一层避免因为抛出异常导致周期性任务终止
*
* @author songyinyin
* @since 2023/9/20 16:04
*/
@Slf4j
public class SafeRunnableWrapper implements Runnable {
private final Runnable runnable;
public SafeRunnableWrapper(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
log.error("[SafeRunnableWrapper] run failed", e);
}
}
}

View File

@ -77,6 +77,10 @@ public class NetUtils {
* @return 本机 IP 地址
*/
public static String getLocalHost() {
return getLocalHostWithNetworkInterfaceChecker(null);
}
public static String getLocalHostWithNetworkInterfaceChecker(NetworkInterfaceChecker networkInterfaceChecker) {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
@ -87,31 +91,39 @@ public class NetUtils {
return HOST_ADDRESS = addressFromJVM;
}
InetAddress address = getLocalAddress();
InetAddress address = getLocalAddress(networkInterfaceChecker);
if (address != null) {
return HOST_ADDRESS = address.getHostAddress();
}
return LOCALHOST_VALUE;
}
/**
* 隔离调用 scope核心场景才能直接调用 getLocalHost方便查看使用点
* @return IP
*/
public static String getLocalHost4Test() {
return getLocalHost();
}
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
public static InetAddress getLocalAddress() {
public static InetAddress getLocalAddress(NetworkInterfaceChecker networkInterfaceChecker) {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
InetAddress localAddress = getLocalAddress0(networkInterfaceChecker);
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static InetAddress getLocalAddress0() {
private static InetAddress getLocalAddress0(NetworkInterfaceChecker networkInterfaceChecker) {
// @since 2.7.6, choose the {@link NetworkInterface} first
try {
InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface());
InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface(networkInterfaceChecker));
if (addressOp != null) {
return addressOp;
}
@ -161,7 +173,7 @@ public class NetUtils {
* @return If no {@link NetworkInterface} is available , return <code>null</code>
* @since 2.7.6
*/
public static NetworkInterface findNetworkInterface() {
public static NetworkInterface findNetworkInterface(NetworkInterfaceChecker networkInterfaceChecker) {
List<NetworkInterface> validNetworkInterfaces = emptyList();
try {
@ -176,7 +188,11 @@ public class NetUtils {
// Try to find the preferred one
for (NetworkInterface networkInterface : validNetworkInterfaces) {
if (isPreferredNetworkInterface(networkInterface)) {
log.info("[Net] use preferred network interface: {}", networkInterface.getDisplayName());
log.info("[Net] use preferred network interface: {}", networkInterface);
return networkInterface;
}
if (isPassedCheckNetworkInterface(networkInterface, networkInterfaceChecker)) {
log.info("[Net] use PassedCheckNetworkInterface: {}", networkInterface);
return networkInterface;
}
}
@ -191,6 +207,25 @@ public class NetUtils {
return first(validNetworkInterfaces);
}
/**
* 通过用户方法判断是否为目标网卡
* @param networkInterface networkInterface
* @param networkInterfaceChecker 判断方法
* @return true or false
*/
static boolean isPassedCheckNetworkInterface(NetworkInterface networkInterface, NetworkInterfaceChecker networkInterfaceChecker) {
if (networkInterfaceChecker == null) {
return false;
}
log.info("[Net] try to choose NetworkInterface by NetworkInterfaceChecker, current NetworkInterface: {}", networkInterface);
try {
return networkInterfaceChecker.ok(networkInterface, getFirstReachableInetAddress(networkInterface));
} catch (Exception e) {
log.warn("[Net] isPassedCheckerNetworkInterface failed, current networkInterface: {}", networkInterface, e);
}
return false;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
@ -344,4 +379,9 @@ public class NetUtils {
}
return false;
}
@FunctionalInterface
public interface NetworkInterfaceChecker {
boolean ok(NetworkInterface networkInterface, InetAddress inetAddress);
}
}

View File

@ -0,0 +1,14 @@
package tech.powerjob.common.utils.net;
import java.io.Closeable;
/**
* socket 服务器用于进行连通性测试
*
* @author tjq
* @since 2024/2/8
*/
public interface PingPongServer extends Closeable {
void initialize(int port) throws Exception;
}

View File

@ -0,0 +1,57 @@
package tech.powerjob.common.utils.net;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.utils.CommonUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
* 简易服务器
*
* @author tjq
* @since 2024/2/8
*/
@Slf4j
public class PingPongSocketServer implements PingPongServer {
private Thread thread;
private ServerSocket serverSocket;
private volatile boolean terminated = false;
@Override
public void initialize(int port) throws Exception{
serverSocket = new ServerSocket(port);
thread = new Thread(() -> {
while (true) {
if (terminated) {
return;
}
// 接收连接如果没有连接accept() 方法会阻塞
try (Socket socket = serverSocket.accept();OutputStream outputStream = socket.getOutputStream();) {
outputStream.write(PingPongUtils.PONG.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
} catch (Exception e) {
if (!terminated) {
log.warn("[PingPongSocketServer] process accepted socket failed!", e);
}
}
}
}, "PingPongSocketServer-Thread");
thread.start();
}
@Override
public void close() throws IOException {
terminated = true;
CommonUtils.executeIgnoreException(() -> serverSocket.close());
thread.interrupt();
}
}

View File

@ -0,0 +1,53 @@
package tech.powerjob.common.utils.net;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.*;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
/**
* socket 连通性助手
*
* @author tjq
* @since 2024/2/8
*/
@Slf4j
public class PingPongUtils {
static final String PING = "ping";
static final String PONG = "pong";
/**
* 验证目标 IP 端口的连通性
* @param targetIp 目标 IP
* @param targetPort 目标端口
* @return true or false
*/
public static boolean checkConnectivity(String targetIp, int targetPort) {
try (Socket s = new Socket(targetIp, targetPort);InputStream is = s.getInputStream();OutputStream os = s.getOutputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
// 发送 PING 请求
os.write(PING.getBytes(StandardCharsets.UTF_8));
os.flush();
//读取服务器返回的消息
String content = br.readLine();
if (PONG.equalsIgnoreCase(content)) {
return true;
}
} catch (UnknownHostException e) {
log.warn("[SocketConnectivityUtils] unknown host: {}:{}", targetIp, targetPort);
} catch (IOException e) {
log.warn("[SocketConnectivityUtils] IOException: {}:{}, msg: {}", targetIp, targetPort, ExceptionUtils.getMessage(e));
} catch (Exception e) {
log.error("[SocketConnectivityUtils] unknown exception for check ip: {}:{}", targetIp, targetPort, e);
}
return false;
}
}

View File

@ -12,19 +12,19 @@ public class NetUtilsTest {
@Test
public void testOrigin() {
System.out.println(NetUtils.getLocalHost());
System.out.println(NetUtils.getLocalHost4Test());
}
@Test
public void testPreferredNetworkInterface() {
System.setProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE, "en5");
System.out.println(NetUtils.getLocalHost());
System.out.println(NetUtils.getLocalHost4Test());
}
@Test
public void testIgnoredNetworkInterface() {
System.setProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX, "utun.|llw.");
System.out.println(NetUtils.getLocalHost());
System.out.println(NetUtils.getLocalHost4Test());
}
}

View File

@ -0,0 +1,31 @@
package tech.powerjob.common.utils.net;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.utils.NetUtils;
/**
* desc
*
* @author tjq
* @since 2024/2/8
*/
class PingPongSocketServerTest {
@Test
void test() throws Exception {
int port = 8877;
PingPongSocketServer pingPongSocketServer = new PingPongSocketServer();
pingPongSocketServer.initialize(port);
System.out.println("[PingPongSocketServerTest] finished initialize");
assert PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port);
assert !PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port + 1);
pingPongSocketServer.close();
System.out.println("[PingPongSocketServerTest] finished close");
}
}

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-official-processors</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>jar</packaging>
<properties>
@ -19,12 +19,11 @@
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<powerjob.worker.version>4.3.6</powerjob.worker.version>
<spring.jdbc.version>5.2.9.RELEASE</spring.jdbc.version>
<h2.db.version>2.2.220</h2.db.version>
<logback.version>1.2.13</logback.version>
<powerjob.worker.version>4.3.7</powerjob.worker.version>
<h2.db.version>2.2.224</h2.db.version>
<mysql.version>8.0.28</mysql.version>
<spring.version>5.3.23</spring.version>
<spring.version>5.3.31</spring.version>
<!-- 全部 shade 化,避免依赖冲突 -->
<fastjson.version>1.2.83</fastjson.version>
@ -87,7 +86,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.jdbc.version}</version>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>

View File

@ -123,14 +123,14 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost());
context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost4Test());
return new ProcessResult(true, "preProcess successfully!");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost());
omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost4Test());
omsLogger.info("====== All Node's Process Result ======");
taskResults.forEach(r -> omsLogger.info("taskId:{},success:{},result:{}", r.getTaskId(), r.isSuccess(), r.getResult()));
return new ProcessResult(true, "postProcess successfully!");

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
@ -24,7 +24,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>5.9.0</junit.version>
<logback.version>1.2.9</logback.version>
<logback.version>1.2.13</logback.version>
</properties>
<dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -19,10 +19,10 @@
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-jar-plugin.version>3.2.2</maven-jar-plugin.version>
<logback.version>1.2.9</logback.version>
<springboot.version>2.7.14</springboot.version>
<powerjob-remote-impl-http.version>4.3.6</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.6</powerjob-remote-impl-akka.version>
<logback.version>1.2.13</logback.version>
<springboot.version>2.7.18</springboot.version>
<powerjob-remote-impl-http.version>4.3.7</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.7</powerjob-remote-impl-akka.version>
<gatling.version>3.9.0</gatling.version>
<gatling-maven-plugin.version>4.2.9</gatling-maven-plugin.version>

View File

@ -5,11 +5,11 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>4.3.6</version>
<version>4.3.7</version>
<artifactId>powerjob-remote-framework</artifactId>
<properties>
@ -17,7 +17,7 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-common.version>4.3.6</powerjob-common.version>
<powerjob-common.version>4.3.7</powerjob-common.version>
<reflections.version>0.10.2</reflections.version>

View File

@ -5,19 +5,19 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-remote-framework.version>4.3.6</powerjob-remote-framework.version>
<powerjob-remote-framework.version>4.3.7</powerjob-remote-framework.version>
<akka.version>2.6.13</akka.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-http</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -18,7 +18,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<vertx.version>4.3.7</vertx.version>
<powerjob-remote-framework.version>4.3.6</powerjob-remote-framework.version>
<powerjob-remote-framework.version>4.3.7</powerjob-remote-framework.version>
</properties>
<dependencies>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>pom</packaging>
<modules>
@ -26,7 +26,7 @@
<properties>
<springboot.version>2.7.14</springboot.version>
<springboot.version>2.7.18</springboot.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.33</mysql.version>
@ -34,7 +34,7 @@
<mssql-jdbc.version>7.4.1.jre8</mssql-jdbc.version>
<db2-jdbc.version>11.5.0.0</db2-jdbc.version>
<postgresql.version>42.6.0</postgresql.version>
<h2.db.version>2.2.220</h2.db.version>
<h2.db.version>2.2.224</h2.db.version>
<mongodb-driver-sync.version>4.10.2</mongodb-driver-sync.version>
<zip4j.version>2.11.2</zip4j.version>
@ -50,9 +50,9 @@
<groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.2.1</cron-utils.version>
<powerjob-common.version>4.3.6</powerjob-common.version>
<powerjob-remote-impl-http.version>4.3.6</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.6</powerjob-remote-impl-akka.version>
<powerjob-common.version>4.3.7</powerjob-common.version>
<powerjob-remote-impl-http.version>4.3.7</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.7</powerjob-remote-impl-akka.version>
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
<minio.version>8.5.2</minio.version>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -44,7 +44,7 @@ public class MailAlarmService implements Alarmable {
SimpleMailMessage sm = new SimpleMailMessage();
try {
sm.setFrom(from);
sm.setTo(targetUserList.stream().map(AlarmTarget::getEmail).filter(Objects::nonNull).toArray(String[]::new));
sm.setTo(targetUserList.stream().map(AlarmTarget::getEmail).filter(Objects::nonNull).filter(email -> !email.isEmpty()).toArray(String[]::new));
sm.setSubject(alarm.fetchTitle());
sm.setText(alarm.fetchContent());

View File

@ -260,10 +260,12 @@ public class InstanceService {
/**
* 获取任务实例的详细运行详细
*
* @param appId 用于远程 server 路由勿删
* @param instanceId 任务实例ID
* @return 详细运行状态
*/
public InstanceDetail getInstanceDetail(Long instanceId) {
@DesignateServer
public InstanceDetail getInstanceDetail(Long appId, Long instanceId) {
InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);

View File

@ -67,8 +67,11 @@ public class CoreScheduleTaskManager implements InitializingBean, DisposableBean
log.info("start task : {}.", taskName);
while (true) {
try {
innerRunnable.run();
// 倒置顺序为 sleep 再执行解决异常情况 while true 打日志的问题 https://github.com/PowerJob/PowerJob/issues/769
Thread.sleep(runningInterval);
innerRunnable.run();
} catch (InterruptedException e) {
log.warn("[{}] task has been interrupted!", taskName, e);
break;

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -6,6 +6,8 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.extension.dfs.DFsService;
/**
@ -15,8 +17,9 @@ import tech.powerjob.server.extension.dfs.DFsService;
* @since 2023/7/28
*/
@Slf4j
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, ServerInfoAware, DisposableBean {
protected ServerInfo serverInfo;
protected ApplicationContext applicationContext;
public AbstractDFsService() {
@ -38,4 +41,9 @@ public abstract class AbstractDFsService implements DFsService, ApplicationConte
log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
init(applicationContext);
}
@Override
public void setServerInfo(ServerInfo serverInfo) {
this.serverInfo = serverInfo;
}
}

View File

@ -139,8 +139,9 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
deleteByLocation(fileLocation);
Map<String, Object> meta = Maps.newHashMap();
meta.put("_server_", NetUtils.getLocalHost());
meta.put("_server_", serverInfo.getIp());
meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath());
BufferedInputStream bufferedInputStream = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()));
Date date = new Date(System.currentTimeMillis());
@ -153,7 +154,7 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
pst.setString(4, JsonUtils.toJSONString(meta));
pst.setLong(5, storeRequest.getLocalFile().length());
pst.setInt(6, SwitchableStatus.ENABLE.getV());
pst.setBlob(7, new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())));
pst.setBlob(7, bufferedInputStream);
pst.setString(8, null);
pst.setDate(9, date);
pst.setDate(10, date);
@ -165,6 +166,8 @@ public class MySqlSeriesDfsService extends AbstractDFsService {
} catch (Exception e) {
log.error("[MySqlSeriesDfsService] store [{}] failed!", fileLocation);
ExceptionUtils.rethrow(e);
}finally {
bufferedInputStream.close();
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -6,7 +6,6 @@ import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.PageResult;
import tech.powerjob.server.persistence.StringPage;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.core.service.CacheService;
import tech.powerjob.server.core.instance.InstanceLogService;
@ -69,8 +68,14 @@ public class InstanceController {
}
@GetMapping("/detail")
public ResultDTO<InstanceDetailVO> getInstanceDetail(Long instanceId) {
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(instanceId)));
public ResultDTO<InstanceDetailVO> getInstanceDetail(Long appId, Long instanceId) {
// 兼容老版本前端不存在 appId 的场景
if (appId == null) {
appId = instanceService.getInstanceInfo(instanceId).getAppId();
}
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(appId, instanceId)));
}
@GetMapping("/log")

View File

@ -11,7 +11,7 @@ import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.net.PingPongUtils;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
@ -66,10 +66,21 @@ public class ServerController implements ServerInfoAware {
return ResultDTO.success(serverElectionService.elect(request));
}
@GetMapping("/checkConnectivity")
public ResultDTO<Boolean> checkConnectivity(String targetIp, Integer targetPort) {
try {
boolean ret = PingPongUtils.checkConnectivity(targetIp, targetPort);
return ResultDTO.success(ret);
} catch (Throwable t) {
return ResultDTO.failed(t);
}
}
@GetMapping("/hello")
public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
JSONObject res = new JSONObject();
res.put("localHost", NetUtils.getLocalHost());
res.put("localHost", serverInfo.getIp());
res.put("serverInfo", serverInfo);
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeTs", System.currentTimeMillis());

View File

@ -53,7 +53,7 @@ public class RepositoryTest {
public void testBatchLock() {
List<OmsLockDO> locks = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost(), 10000L);
OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost4Test(), 10000L);
locks.add(lockDO);
}
omsLockRepository.saveAll(locks);
@ -63,7 +63,7 @@ public class RepositoryTest {
@Test
public void testDeleteLock() {
String lockName = "test-lock";
OmsLockDO lockDO = new OmsLockDO(lockName, NetUtils.getLocalHost(), 10000L);
OmsLockDO lockDO = new OmsLockDO(lockName, NetUtils.getLocalHost4Test(), 10000L);
omsLockRepository.save(lockDO);
omsLockRepository.deleteByLockName(lockName);
}

View File

@ -5,24 +5,24 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>4.3.6</powerjob.worker.version>
<logback.version>1.2.9</logback.version>
<powerjob.worker.version>4.3.7</powerjob.worker.version>
<logback.version>1.2.13</logback.version>
<picocli.version>4.3.2</picocli.version>
<spring.version>5.3.23</spring.version>
<spring.version>5.3.31</spring.version>
<spring.boot.version>2.3.4.RELEASE</spring.boot.version>
<powerjob.official.processors.version>4.3.6</powerjob.official.processors.version>
<powerjob.official.processors.version>4.3.7</powerjob.official.processors.version>
<!-- dependency for dynamic sql processor -->
<mysql.version>8.0.28</mysql.version>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<properties>
<springboot.version>2.7.14</springboot.version>
<powerjob.worker.starter.version>4.3.6</powerjob.worker.starter.version>
<springboot.version>2.7.18</springboot.version>
<powerjob.worker.starter.version>4.3.7</powerjob.worker.starter.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.official.processors.version>4.3.6</powerjob.official.processors.version>
<powerjob.official.processors.version>4.3.7</powerjob.official.processors.version>
<!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip>

View File

@ -0,0 +1,16 @@
package tech.powerjob.samples.anno;
import java.lang.annotation.*;
/**
* 自定义方法注解
* <a href="https://github.com/PowerJob/PowerJob/issues/770">自定义注解导致 @PowerJobHandler 失效</a>
*
* @author tjq
* @since 2024/2/8
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ATestMethodAnnotation {
}

View File

@ -24,7 +24,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) {
System.out.println("===== BroadcastProcessorDemo#preProcess ======");
context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost());
context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost4Test());
if ("rootFailed".equals(context.getJobParams())) {
return new ProcessResult(false, "console need failed");
} else {
@ -36,7 +36,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor {
public ProcessResult process(TaskContext taskContext) throws Exception {
OmsLogger logger = taskContext.getOmsLogger();
System.out.println("===== BroadcastProcessorDemo#process ======");
logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost4Test());
long sleepTime = 1000;
try {
sleepTime = Long.parseLong(taskContext.getJobParams());
@ -50,7 +50,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) {
System.out.println("===== BroadcastProcessorDemo#postProcess ======");
context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults);
context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost4Test(), taskResults);
return new ProcessResult(true, "success");
}
}

View File

@ -2,6 +2,7 @@ package tech.powerjob.samples.processors;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.*;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
@ -9,10 +10,6 @@ import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -85,10 +82,18 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
}
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class TestSubTask {
/**
* 注意代表子任务参数的类一定要有无参构造方法一定要有无参构造方法一定要有无参构造方法
* 最好把 GET / SET 方法也加上减少序列化问题的概率
*/
public TestSubTask() {
}
private String name;
private int age;
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.samples.tester;
import org.springframework.stereotype.Component;
import tech.powerjob.samples.anno.ATestMethodAnnotation;
import tech.powerjob.worker.annotation.PowerJobHandler;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.log.OmsLogger;
@ -33,4 +34,12 @@ public class SpringMethodProcessorService {
omsLogger.warn("testThrowException");
throw new IllegalArgumentException("test");
}
@ATestMethodAnnotation
@PowerJobHandler(name = "testNormalReturnWithCustomAnno")
public String testNormalReturnWithCustomAnno(TaskContext context) {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.warn("测试自定义注解");
return "testNormalReturnWithCustomAnno";
}
}

View File

@ -5,17 +5,17 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>4.3.6</powerjob.worker.version>
<springboot.version>2.7.14</springboot.version>
<powerjob.worker.version>4.3.7</powerjob.worker.version>
<springboot.version>2.7.18</springboot.version>
</properties>
<dependencies>

View File

@ -5,26 +5,26 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.6</version>
<version>4.3.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>4.3.6</version>
<version>4.3.7</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.3.23</spring.version>
<h2.db.version>2.2.220</h2.db.version>
<spring.version>5.3.31</spring.version>
<h2.db.version>2.2.224</h2.db.version>
<hikaricp.version>4.0.3</hikaricp.version>
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<logback.version>1.2.13</logback.version>
<powerjob-common.version>4.3.6</powerjob-common.version>
<powerjob-remote-framework.version>4.3.6</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.3.6</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.6</powerjob-remote-impl-http.version>
<powerjob-common.version>4.3.7</powerjob-common.version>
<powerjob-remote-framework.version>4.3.7</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.3.7</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.7</powerjob-remote-impl-http.version>
</properties>
<dependencies>

View File

@ -6,7 +6,6 @@ import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
@ -24,6 +23,7 @@ import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.WorkerNetUtils;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import tech.powerjob.worker.persistence.TaskPersistenceService;
@ -74,13 +74,16 @@ public class PowerJobWorker {
try {
PowerBannerPrinter.print();
// 在发第一个请求之前完成真正 IP 的解析
int localBindPort = config.getPort();
String localBindIp = WorkerNetUtils.parseLocalBindIp(localBindPort, config.getServerAddress());
// 校验 appName
WorkerAppInfo appInfo = serverDiscoveryService.assertApp();
workerRuntime.setAppInfo(appInfo);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String localBindIp = NetUtils.getLocalHost();
int localBindPort = config.getPort();
String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);
String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));
log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);

View File

@ -1,5 +1,6 @@
package tech.powerjob.worker.background;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.request.WorkerLogReportReq;
@ -69,10 +70,10 @@ public class OmsLogHandler {
private class LogSubmitter implements Runnable {
private class LogSubmitter extends SafeRunnable {
@Override
public void run() {
public void run0() {
boolean lockResult = reportLock.tryLock();
if (!lockResult) {

View File

@ -3,6 +3,7 @@ package tech.powerjob.worker.background;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.common.request.WorkerHeartbeat;
import tech.powerjob.worker.common.PowerJobWorkerVersion;
@ -22,12 +23,12 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
*/
@Slf4j
@RequiredArgsConstructor
public class WorkerHealthReporter implements Runnable {
public class WorkerHealthReporter extends SafeRunnable {
private final WorkerRuntime workerRuntime;
@Override
public void run() {
public void run0() {
// 没有可用Server无法上报
String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();

View File

@ -0,0 +1,76 @@
package tech.powerjob.worker.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.net.PingPongServer;
import tech.powerjob.common.utils.net.PingPongSocketServer;
import java.util.List;
/**
* PowerJob Worker 专用的网络工具类
*
* @author tjq
* @since 2024/2/8
*/
@Slf4j
public class WorkerNetUtils {
private static final String SERVER_CONNECTIVITY_CHECK_URL_PATTERN = "http://%s/server/checkConnectivity?targetIp=%s&targetPort=%d";
/**
* 多网卡情况下解析可与 server 通讯的本地 IP 地址
* @param port 目标端口
* @param serverAddress server 服务地址
* @return 本机IP
*/
public static String parseLocalBindIp(int port, List<String> serverAddress) {
PingPongServer pingPongServer = null;
try {
pingPongServer = new PingPongSocketServer();
pingPongServer.initialize(port);
log.info("[WorkerNetUtils] initialize PingPongSocketServer successfully~");
} catch (Exception e) {
log.warn("[WorkerNetUtils] PingPongSocketServer failed to start, which may result in an incorrectly bound IP, please pay attention to the initialize log.", e);
}
String localHostWithNetworkInterfaceChecker = NetUtils.getLocalHostWithNetworkInterfaceChecker(((networkInterface, inetAddress) -> {
if (inetAddress == null) {
return false;
}
String workerIp = inetAddress.getHostAddress();
for (String address : serverAddress) {
String url = String.format(SERVER_CONNECTIVITY_CHECK_URL_PATTERN, address, workerIp, port);
try {
String resp = HttpUtils.get(url);
log.info("[WorkerNetUtils] check connectivity by url[{}], response: {}", url, resp);
if (StringUtils.isNotEmpty(resp)) {
ResultDTO<?> resultDTO = JsonUtils.parseObject(resp, ResultDTO.class);
return Boolean.TRUE.toString().equalsIgnoreCase(String.valueOf(resultDTO.getData()));
}
} catch (Exception ignore) {
}
}
return false;
}));
if (pingPongServer != null) {
try {
pingPongServer.close();
log.info("[WorkerNetUtils] close PingPongSocketServer successfully~");
} catch (Exception e) {
log.warn("[WorkerNetUtils] close PingPongSocketServer failed!", e);
}
}
return localHostWithNetworkInterfaceChecker;
}
}

View File

@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
@ -237,11 +238,11 @@ public class ProcessorTracker {
/**
* 定时向 TaskTracker 汇报携带任务执行信息的心跳
*/
private class CheckerAndReporter implements Runnable {
private class CheckerAndReporter extends SafeRunnable {
@Override
@SuppressWarnings({"squid:S1066","squid:S3776"})
public void run() {
public void run0() {
// 超时检查如果超时则自动关闭 TaskTracker
long interval = System.currentTimeMillis() - startTime;

View File

@ -2,6 +2,7 @@ package tech.powerjob.worker.core.tracker.task;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
@ -33,6 +34,7 @@ public abstract class TaskTracker {
* 任务实例信息
*/
protected final InstanceInfo instanceInfo;
protected final ExecuteType executeType;
/**
* 追加的工作流上下文数据
*
@ -75,6 +77,9 @@ public abstract class TaskTracker {
instanceInfo.setLogConfig(req.getLogConfig());
instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS());
// 常用变量初始化
executeType = ExecuteType.valueOf(req.getExecuteType());
// 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) {
instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);

View File

@ -19,6 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -83,7 +84,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
// 构建缓存
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build();
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build();
// 构建分段锁
segmentLock = new SegmentLock(UPDATE_CONCURRENCY);
@ -226,7 +227,6 @@ public abstract class HeavyTaskTracker extends TaskTracker {
3. 广播任务每台机器都需要执行因此不应该重新分配worker广播任务不应当修改地址
*/
String taskName = taskOpt.get().getTaskName();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
@ -243,8 +243,8 @@ public abstract class HeavyTaskTracker extends TaskTracker {
}
}
// 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢...
result = result == null ? "" : result;
// 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢...24.2.4 更新大规模 MAP 任务追求极限性能不持久化无用的子任务 result
result = result == null || ExecuteType.MAP.equals(executeType) ? "" : result;
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
@ -445,13 +445,13 @@ public abstract class HeavyTaskTracker extends TaskTracker {
/**
* 定时扫描数据库中的task出于内存占用量考虑每次最多获取100个并将需要执行的任务派发出去
*/
protected class Dispatcher implements Runnable {
protected class Dispatcher extends SafeRunnable {
// 数据库查询限制每次最多查询几个任务
private static final int DB_QUERY_LIMIT = 100;
@Override
public void run() {
public void run0() {
if (finished.get()) {
return;
@ -503,9 +503,9 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* 执行器动态上线for 秒级任务和 MR 任务
* 原则server 查询得到的 执行器状态不会干预 worker 自己维护的状态即只做新增不做任何修改
*/
protected class WorkerDetector implements Runnable {
protected class WorkerDetector extends SafeRunnable {
@Override
public void run() {
public void run0() {
boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);

View File

@ -9,6 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.enhance.SafeRunnableWrapper;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -93,14 +94,14 @@ public class LightTaskTracker extends TaskTracker {
// 初始延迟加入随机值避免在高并发场景下所有请求集中在一个时间段
long initDelay = RandomUtils.nextInt(5000, 10000);
// 上报任务状态
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new SafeRunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS);
// 超时控制
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
} else {
// 执行时间超过 1 s 的任务超时检测最小颗粒度为 1 s
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
}
} else {
timeoutCheckScheduledFuture = null;
@ -148,7 +149,7 @@ public class LightTaskTracker extends TaskTracker {
}
LightTaskTrackerManager.removeTaskTracker(instanceId);
// 最后一列为总耗时即占用资源的耗时当前时间减去创建时间
log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime);
log.info("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime);
}
@Override

View File

@ -17,11 +17,20 @@ import java.util.Map;
* @author tjq
* @since 2020/3/17
*/
@AllArgsConstructor
public class TaskDAOImpl implements TaskDAO {
private final boolean useIndex;
private final ConnectionFactory connectionFactory;
public TaskDAOImpl(ConnectionFactory connectionFactory) {
this(false, connectionFactory);
}
public TaskDAOImpl(boolean useIndex, ConnectionFactory connectionFactory) {
this.useIndex = useIndex;
this.connectionFactory = connectionFactory;
}
@Override
public void initTable() throws Exception {
@ -30,9 +39,14 @@ public class TaskDAOImpl implements TaskDAO {
// bigint(20) Java Long 取值范围完全一致
String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint, sub_instance_id bigint, task_name varchar(255), task_content blob, address varchar(255), status int, result text, failed_cnt int, created_time bigint, last_modified_time bigint, last_report_time bigint, constraint pkey unique (instance_id, task_id))";
String createIndexSQL = "create INDEX idx_status ON task_info (status)";
try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.execute(delTableSQL);
stat.execute(createTableSQL);
if (useIndex) {
stat.execute(createIndexSQL);
}
}
}

View File

@ -13,10 +13,10 @@ import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
/**
* 任务持久化服务
@ -36,6 +36,11 @@ public class TaskPersistenceService {
private static final long RETRY_INTERVAL_MS = 100;
/**
* 慢查询定义200ms
*/
private static final long SLOW_QUERY_RT_THRESHOLD = 200;
private TaskDAO taskDAO;
public TaskPersistenceService(StoreStrategy strategy) {
@ -54,7 +59,7 @@ public class TaskPersistenceService {
public boolean save(TaskDO task) {
try {
return execute(() -> taskDAO.save(task));
return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] save task{} failed.", task, e);
}
@ -66,7 +71,7 @@ public class TaskPersistenceService {
return true;
}
try {
return execute(() -> taskDAO.batchSave(tasks));
return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
}
@ -80,7 +85,7 @@ public class TaskPersistenceService {
try {
updateEntity.setLastModifiedTime(System.currentTimeMillis());
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTask failed.", e);
}
@ -92,7 +97,7 @@ public class TaskPersistenceService {
*/
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
try {
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result));
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
}
@ -125,7 +130,7 @@ public class TaskPersistenceService {
log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition());
try {
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateLostTasks failed.", e);
}
@ -148,7 +153,7 @@ public class TaskPersistenceService {
return Optional.empty();
}
return Optional.of(taskDOS.get(0));
});
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e);
}
@ -161,7 +166,7 @@ public class TaskPersistenceService {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleQuery(query));
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
}
@ -178,7 +183,7 @@ public class TaskPersistenceService {
query.setAddress(address);
query.setQueryCondition(condition);
return execute(() -> taskDAO.simpleQuery(query));
return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
}
@ -194,7 +199,7 @@ public class TaskPersistenceService {
query.setInstanceId(instanceId);
query.setStatus(status.getValue());
query.setLimit(limit);
return execute(() -> taskDAO.simpleQuery(query));
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e);
}
@ -224,7 +229,7 @@ public class TaskPersistenceService {
result.put(TaskStatus.of(status), num);
});
return result;
});
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
}
@ -236,31 +241,13 @@ public class TaskPersistenceService {
*/
public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
try {
return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId));
return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e);
}
return Lists.newLinkedList();
}
/**
* 查询任务状态只查询 status节约 I/O 资源 -> 测试表明我高端的NVMeSSD上都效果惊人...别说一般的HDD了...磁盘I/O果然是重要瓶颈...
*/
public Optional<TaskStatus> getTaskStatus(Long instanceId, String taskId) {
try {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("status");
return execute(() -> {
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
return Optional.of(TaskStatus.of((int) rows.get(0).get("status")));
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e);
}
return Optional.empty();
}
/**
* 根据主键查询 Task
*/
@ -273,7 +260,7 @@ public class TaskPersistenceService {
return Optional.empty();
}
return Optional.of(res.get(0));
});
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e);
}
@ -281,35 +268,11 @@ public class TaskPersistenceService {
}
/**
* 批量更新 Task 状态
*/
public boolean batchUpdateTaskStatus(Long instanceId, List<String> taskIds, TaskStatus status, String result) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setQueryCondition(String.format(" task_id in %s ", CommonUtils.getInStringCondition(taskIds)));
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue());
updateEntity.setResult(result);
return taskDAO.simpleUpdate(query, updateEntity);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskIds={},status={},result={}.",
instanceId, taskIds, status, result, e);
}
return false;
}
public boolean deleteAllTasks(Long instanceId) {
try {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
return execute(() -> taskDAO.simpleDelete(condition));
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
}
@ -321,26 +284,13 @@ public class TaskPersistenceService {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleDelete(condition));
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
}
return false;
}
public List<TaskDO> listAll() {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setQueryCondition("1 = 1");
return taskDAO.simpleQuery(query);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] listAll failed.", e);
}
return Collections.emptyList();
}
private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
@ -348,7 +298,15 @@ public class TaskPersistenceService {
return condition;
}
private static <T> T execute(SupplierPlus<T> executor) throws Exception {
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
private static <T> T execute(SupplierPlus<T> executor, Consumer<Long> slowQueryLogger) throws Exception {
long s = System.currentTimeMillis();
try {
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
} finally {
long cost = System.currentTimeMillis() - s;
if (cost > SLOW_QUERY_RT_THRESHOLD) {
slowQueryLogger.accept(cost);
}
}
}
}

View File

@ -3,6 +3,7 @@ package tech.powerjob.worker.processor.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import tech.powerjob.worker.annotation.PowerJobHandler;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
@ -51,6 +52,12 @@ public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringPr
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
PowerJobHandler powerJob = method.getAnnotation(PowerJobHandler.class);
// CGLib代理对象拿不到该注解, 通过 AnnotationUtils.findAnnotation()可以获取到注解 by GitHub@zhangxiang0907 https://github.com/PowerJob/PowerJob/issues/770
if (powerJob == null) {
powerJob = AnnotationUtils.findAnnotation(method, PowerJobHandler.class);
}
if (powerJob == null) {
continue;
}

View File

@ -0,0 +1,31 @@
package tech.powerjob.worker.persistence;
import tech.powerjob.worker.common.constants.TaskStatus;
import java.nio.charset.StandardCharsets;
/**
* AbstractTaskDAOTest
*
* @author tjq
* @since 2024/2/4
*/
public class AbstractTaskDAOTest {
protected static TaskDO buildTaskDO(String taskId, Long instanceId, TaskStatus taskStatus) {
TaskDO taskDO = new TaskDO();
taskDO.setTaskId(taskId);
taskDO.setInstanceId(instanceId);
taskDO.setSubInstanceId(instanceId);
taskDO.setTaskName("TEST_TASK");
taskDO.setTaskContent("TEST_CONTENT".getBytes(StandardCharsets.UTF_8));
taskDO.setAddress("127.0.0.1:10086");
taskDO.setStatus(taskStatus.getValue());
taskDO.setResult("SUCCESS");
taskDO.setFailedCnt(0);
taskDO.setLastModifiedTime(System.currentTimeMillis());
taskDO.setLastReportTime(System.currentTimeMillis());
taskDO.setCreatedTime(System.currentTimeMillis());
return taskDO;
}
}

View File

@ -0,0 +1,111 @@
package tech.powerjob.worker.persistence;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;
import tech.powerjob.worker.common.constants.StoreStrategy;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
/**
* 任务持久化层 - 性能测试
*
* @author tjq
* @since 2024/2/4
*/
@Slf4j(topic = "PERFORMANCE_TEST_LOGGER")
public class TaskDAOPerformanceTest extends AbstractTaskDAOTest {
private static final int INSERT_NUM = 100000;
private static final Long INSTANCE_ID = 10086L;
@Test
void testInsert() throws Exception {
TaskDAO noIndexDao = initTaskDao(false);
TaskDAO indexDao = initTaskDao(true);
for (int i = 0; i < 1; i++) {
testWriteThenRead(noIndexDao, INSERT_NUM, "no-idx-" + i);
testWriteThenRead(indexDao, INSERT_NUM, "uu-idx-" + i);
}
}
@SneakyThrows
private void testWriteThenRead(TaskDAO taskDAO, int num, String taskName) {
String logKey = "testWriteThenRead-" + taskName;
StopWatch stopWatch = new StopWatch();
AtomicLong atomicLong = new AtomicLong();
ForkJoinPool pool = new ForkJoinPool(256);
CountDownLatch latch = new CountDownLatch(num);
stopWatch.start("Insert");
for (int i = 0; i < num; i++) {
pool.execute(() -> {
long id = atomicLong.incrementAndGet();
String taskId = String.format("%s.%d", taskName, id);
TaskDO taskDO = buildTaskDO(taskId, INSTANCE_ID, TaskStatus.of(ThreadLocalRandom.current().nextInt(1, 7)));
try {
long s = System.currentTimeMillis();
taskDAO.save(taskDO);
long cost = System.currentTimeMillis() - s;
if (cost > 10) {
log.warn("[{}] id={} save cost too much: {}", logKey, id, cost);
}
} catch (Exception e) {
log.error("[{}] id={} save failed!", logKey, id, e);
} finally {
latch.countDown();
}
});
}
latch.await();
stopWatch.stop();
stopWatch.start("READ-getAllTaskResult");
// 测试读
List<TaskResult> allTaskResult = taskDAO.getAllTaskResult(INSTANCE_ID, INSTANCE_ID);
stopWatch.stop();
// 测试统计
stopWatch.start("READ-countByStatus");
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(INSTANCE_ID);
query.setSubInstanceId(INSTANCE_ID);
query.setQueryContent("status, count(*) as num");
query.setOtherCondition("GROUP BY status");
List<Map<String, Object>> countByStatus = taskDAO.simpleQueryPlus(query);
stopWatch.stop();
String prettyPrint = stopWatch.prettyPrint();
System.out.println(logKey + ": " + prettyPrint);
log.info("[{}] {}", logKey, prettyPrint);
}
@SneakyThrows
private TaskDAO initTaskDao(boolean useIndex) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.initDatasource(StoreStrategy.DISK);
TaskDAO taskDAO = new TaskDAOImpl(useIndex, connectionFactory);
taskDAO.initTable();
return taskDAO;
}
}

View File

@ -24,7 +24,7 @@ import static org.junit.jupiter.api.Assertions.*;
* @since 2022/10/23
*/
@Slf4j
class TaskDAOTest {
class TaskDAOTest extends AbstractTaskDAOTest {
private static TaskDAO taskDAO;
@ -94,22 +94,4 @@ class TaskDAOTest {
assert allTaskResult.size() == 2;
}
private static TaskDO buildTaskDO(String taskId, Long instanceId, TaskStatus taskStatus) {
TaskDO taskDO = new TaskDO();
taskDO.setTaskId(taskId);
taskDO.setInstanceId(instanceId);
taskDO.setSubInstanceId(instanceId);
taskDO.setTaskName("TEST_TASK");
taskDO.setTaskContent("TEST_CONTENT".getBytes(StandardCharsets.UTF_8));
taskDO.setAddress("127.0.0.1:10086");
taskDO.setStatus(taskStatus.getValue());
taskDO.setResult("SUCCESS");
taskDO.setFailedCnt(0);
taskDO.setLastModifiedTime(System.currentTimeMillis());
taskDO.setLastReportTime(System.currentTimeMillis());
taskDO.setCreatedTime(System.currentTimeMillis());
return taskDO;
}
}

View File

@ -33,7 +33,7 @@ public class CommonTest {
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
req.setTaskTrackerAddress(NetUtils.getLocalHost() + ":27777");
req.setTaskTrackerAddress(NetUtils.getLocalHost4Test() + ":27777");
req.setInstanceInfo(instanceInfo);
req.setTaskId("0");

View File

@ -40,7 +40,7 @@ public class PersistenceServiceTest {
task.setFailedCnt(0);
task.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
task.setTaskName("ROOT_TASK");
task.setAddress(NetUtils.getLocalHost());
task.setAddress(NetUtils.getLocalHost4Test());
task.setLastModifiedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
task.setLastReportTime(System.currentTimeMillis());
@ -56,14 +56,6 @@ public class PersistenceServiceTest {
Thread.sleep(60000);
}
@AfterEach
public void listData() {
System.out.println("============= listData =============");
List<TaskDO> result = taskPersistenceService.listAll();
System.out.println("size: " + result.size());
result.forEach(System.out::println);
}
@Test
public void testBatchSave(){
@ -78,7 +70,7 @@ public class PersistenceServiceTest {
task.setFailedCnt(0);
task.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
task.setTaskName("ROOT_TASK");
task.setAddress(NetUtils.getLocalHost());
task.setAddress(NetUtils.getLocalHost4Test());
task.setLastModifiedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
task.setLastReportTime(System.currentTimeMillis());
@ -101,14 +93,14 @@ public class PersistenceServiceTest {
@Test
public void testUpdateLostTasks() throws Exception {
Thread.sleep(1000);
boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost()), true);
boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost4Test()), true);
System.out.println("updateLostTasks: " + success);
}
@Test
public void testGetAllUnFinishedTaskByAddress() throws Exception {
System.out.println("=============== testGetAllUnFinishedTaskByAddress ===============");
List<TaskDO> res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost());
List<TaskDO> res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost4Test());
System.out.println(res);
}

View File

@ -21,7 +21,7 @@ public class TestUtils {
req.setJobId(1L);
req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost4Test() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
req.setJobParams("JobParams");
req.setInstanceParams("InstanceParams");

View File

@ -20,6 +20,28 @@
<appender-ref ref="STDOUT"/>
</logger>
<!-- 性能测试专用,输出到本地文件,防止日志 IO 本身成为瓶颈 -->
<appender name="PERFORMANCE_TEST_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/powerjob/worker/test/logs/performance_test.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${user.home}/powerjob/worker/test/logs/performance_test.log.%d{yyyy-MM-dd}</fileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
</appender>
<appender name="ASYNC_PERFORMANCE_TEST_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
<appender-ref ref="PERFORMANCE_TEST_APPENDER"/>
</appender>
<logger name="PERFORMANCE_TEST_LOGGER" level="INFO" additivity="false">
<appender-ref ref="ASYNC_PERFORMANCE_TEST_APPENDER"/>
</logger>
<!-- 控制台输出日志级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>