Merge branch '4.3.4'

This commit is contained in:
tjq 2023-08-13 23:05:55 +08:00
commit 58e542c69a
90 changed files with 2203 additions and 499 deletions

4
SECURITY.md Normal file
View File

@ -0,0 +1,4 @@
# Security notices relating to PowerJob
Please disclose any security issues or vulnerabilities found through [Tidelift's coordinated disclosure system](https://tidelift.com/security) or to the maintainers privately(tengjiqi@gmail.com).

View File

@ -90,7 +90,7 @@ if [ "$startup" = "y" ] || [ "$startup" = "Y" ]; then
echo "================== 准备启动 powerjob-server =================="
docker run -d \
--name powerjob-server \
-p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
-p 7700:7700 -p 10086:10086 -p 10010:10010 -p 5001:5005 -p 10001:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--spring.profiles.active=pre" \
-e TZ="Asia/Shanghai" \

View File

@ -6,7 +6,7 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>pom</packaging>
<name>powerjob</name>
<url>http://www.powerjob.tech</url>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.9.1</junit.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.common.version>4.3.3</powerjob.common.version>
<powerjob.common.version>4.3.4</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>jar</packaging>
<properties>

View File

@ -5,7 +5,6 @@ import java.net.NetworkInterface;
/**
* 通过 JVM 启动参数传入的配置信息
*
*
* @author tjq
* @since 2020/8/8
*/
@ -16,7 +15,15 @@ public class PowerJobDKey {
*/
public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred";
/**
* 绑定地址一般填写本机网卡地址
*/
public static final String BIND_LOCAL_ADDRESS = "powerjob.network.local.address";
/**
* 外部地址可选默认与绑定地址相同当存在 NAT 等场景时可通过单独传递外部地址来实现通讯
*/
public static final String NT_EXTERNAL_ADDRESS = "powerjob.network.external.address";
public static final String NT_EXTERNAL_PORT = "powerjob.network.external.port";
/**
* Java regular expressions for network interfaces that will be ignored.

View File

@ -0,0 +1,10 @@
package tech.powerjob.common.exception;
/**
* ImpossibleException
*
* @author tjq
* @since 2023/7/12
*/
public class ImpossibleException extends RuntimeException {
}

View File

@ -8,9 +8,12 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.exception.ImpossibleException;
import tech.powerjob.common.exception.PowerJobException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* JSON工具类
@ -27,6 +30,8 @@ public class JsonUtils {
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
.build();
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>> () {};
private JsonUtils(){
}
@ -67,6 +72,18 @@ public class JsonUtils {
return JSON_MAPPER.readValue(json, clz);
}
public static Map<String, Object> parseMap(String json) {
if (StringUtils.isEmpty(json)) {
return new HashMap<>();
}
try {
return JSON_MAPPER.readValue(json, MAP_TYPE_REFERENCE);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
throw new ImpossibleException();
}
public static <T> T parseObject(byte[] b, Class<T> clz) throws IOException {
return JSON_MAPPER.readValue(b, clz);
}

View File

@ -56,6 +56,21 @@ public class NetUtils {
return ThreadLocalRandom.current().nextInt(RND_PORT_START, RND_PORT_END);
}
/**
* 检测某个 IP 端口是否可用
* @param ip IP
* @param port 端口
* @return 是否可用
*/
public static boolean checkIpPortAvailable(String ip, int port) {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(ip, port), 1000);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 获取本机 IP 地址
*
@ -155,6 +170,9 @@ public class NetUtils {
log.warn("[Net] findNetworkInterface failed", e);
}
// sort by interface index, the smaller is preferred.
validNetworkInterfaces.sort(Comparator.comparingInt(NetworkInterface::getIndex));
// Try to find the preferred one
for (NetworkInterface networkInterface : validNetworkInterfaces) {
if (isPreferredNetworkInterface(networkInterface)) {

View File

@ -0,0 +1,32 @@
package tech.powerjob.common.utils;
import org.apache.commons.lang3.StringUtils;
/**
* PropertyUtils
*
* @author tjq
* @since 2023/7/15
*/
public class PropertyUtils {
public static String readProperty(String key, String defaultValue) {
// 从启动参数读取
String property = System.getProperty(key);
if (StringUtils.isNotEmpty(property)) {
return property;
}
// ENV 读取
property= System.getenv(key);
if (StringUtils.isNotEmpty(property)) {
return property;
}
// 部分操作系统不兼容 a.b.c 的环境变量转换为 a_b_c 再取一次 PowerJob 支持 2 种类型的环境变量 key
property = System.getenv(key.replaceAll("\\.", "_"));
if (StringUtils.isNotEmpty(property)) {
return property;
}
return defaultValue;
}
}

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-official-processors</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>jar</packaging>
<properties>
@ -20,7 +20,7 @@
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<powerjob.worker.version>4.3.3</powerjob.worker.version>
<powerjob.worker.version>4.3.4</powerjob.worker.version>
<spring.jdbc.version>5.2.9.RELEASE</spring.jdbc.version>
<h2.db.version>2.1.214</h2.db.version>
<mysql.version>8.0.28</mysql.version>

View File

@ -0,0 +1,218 @@
package tech.powerjob.official.processors.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.official.processors.CommonBasicProcessor;
import tech.powerjob.official.processors.util.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
* 功能验证用处理器帮助用户快速验证想要测试的功能
*
* @author tjq
* @since 2023/8/13
*/
public class VerificationProcessor extends CommonBasicProcessor implements MapReduceProcessor, BroadcastProcessor {
@Override
protected ProcessResult process0(TaskContext taskContext) throws Exception {
final OmsLogger omsLogger = taskContext.getOmsLogger();
final String paramsStr = CommonUtils.parseParams(taskContext);
final VerificationParam verificationParam = JSONObject.parseObject(paramsStr, VerificationParam.class);
final Mode mode = Mode.of(verificationParam.getMode());
switch (mode) {
case ERROR:
return new ProcessResult(false, "EXECUTE_FAILED_FOR_TEST");
case EXCEPTION:
throw new PowerJobException("exception for test");
case TIMEOUT:
final Long sleepMs = Optional.ofNullable(verificationParam.getSleepMs()).orElse(3600000L);
Thread.sleep(sleepMs);
return new ProcessResult(true, "AFTER_SLEEP_" + sleepMs);
case RETRY:
int currentRetryTimes = taskContext.getCurrentRetryTimes();
int maxRetryTimes = taskContext.getMaxRetryTimes();
omsLogger.info("[Retry] currentRetryTimes: {}, maxRetryTimes: {}", currentRetryTimes, maxRetryTimes);
if (currentRetryTimes < maxRetryTimes) {
Thread.sleep(100);
omsLogger.info("[Retry] currentRetryTimes[{}] < maxRetryTimes[{}], return failed status!", currentRetryTimes, maxRetryTimes);
return new ProcessResult(false, "FAILED_UNTIL_LAST_RETRY_" + currentRetryTimes);
} else {
omsLogger.info("[Retry] last retry, return success status!");
return new ProcessResult(true, "RETRY_SUCCESSFULLY!");
}
case MR:
if (isRootTask()) {
final int batchNum = Optional.ofNullable(verificationParam.getBatchNum()).orElse(10);
final int batchSize = Optional.ofNullable(verificationParam.getBatchSize()).orElse(100);
omsLogger.info("[VerificationProcessor] start root task~");
List<TestSubTask> subTasks = new ArrayList<>();
for (int a = 0; a < batchNum; a++) {
for (int b = 0; b < batchSize; b++) {
int x = a * batchSize + b;
subTasks.add(new TestSubTask("task_" + x, x));
}
map(subTasks, "MAP_TEST_TASK_" + a);
omsLogger.info("[VerificationProcessor] [{}] map one batch successfully~", batchNum);
subTasks.clear();
}
omsLogger.info("[VerificationProcessor] all map successfully!");
return new ProcessResult(true, "MAP_SUCCESS");
} else {
String taskId = taskContext.getTaskId();
final Double successRate = Optional.ofNullable(verificationParam.getSubTaskSuccessRate()).orElse(0.5);
final double rd = ThreadLocalRandom.current().nextDouble(0, 1);
boolean success = rd <= successRate;
long processCost = ThreadLocalRandom.current().nextLong(277);
Thread.sleep(processCost);
omsLogger.info("[VerificationProcessor] [MR] taskId:{}, processCost: {}, success:{}", taskId, processCost, success);
return new ProcessResult(success, RandomStringUtils.randomAlphanumeric(3));
}
}
String randomMsg = RandomStringUtils.randomAlphanumeric(Optional.ofNullable(verificationParam.getResponseSize()).orElse(10));
omsLogger.info("generate random string: {}", randomMsg);
return new ProcessResult(true, "EXECUTE_SUCCESSFULLY_" + randomMsg);
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
List<String> successTaskIds = Lists.newArrayList();
List<String> failedTaskIds = Lists.newArrayList();
StringBuilder sb = new StringBuilder();
taskResults.forEach(taskResult -> {
sb.append("tId:").append(taskResult.getTaskId()).append(";")
.append("tSuc:").append(taskResult.isSuccess()).append(";")
.append("tRes:").append(taskResult.getResult());
if (taskResult.isSuccess()) {
successTaskIds.add(taskResult.getTaskId());
} else {
failedTaskIds.add(taskResult.getTaskId());
}
});
context.getOmsLogger().info("[Reduce] [summary] successTaskNum: {}, failedTaskNum: {}, successRate: {}",
successTaskIds.size(), failedTaskIds.size(), 1.0 * successTaskIds.size() / (successTaskIds.size() + failedTaskIds.size()));
context.getOmsLogger().info("[Reduce] successTaskIds: {}", successTaskIds);
context.getOmsLogger().info("[Reduce] failedTaskIds: {}", failedTaskIds);
return new ProcessResult(true, sb.toString());
}
/* ************************** 广播任务部分 ************************** */
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost());
return new ProcessResult(true, "preProcess successfully!");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost());
omsLogger.info("====== All Node's Process Result ======");
taskResults.forEach(r -> omsLogger.info("taskId:{},success:{},result:{}", r.getTaskId(), r.isSuccess(), r.getResult()));
return new ProcessResult(true, "postProcess successfully!");
}
/* ************************** 广播任务部分 ************************** */
enum Mode {
/**
* 常规模式直接返回响应
* {"mode":"BASE","responseSize":12}
*/
BASE,
/**
* 超时sleep 一段时间测试超时控制
* {"mode":"TIMEOUT","sleepMs":3600000}
*/
TIMEOUT,
/**
* 测试执行失败响应返回 success = false
* {"mode":"ERROR"}
*/
ERROR,
/**
* 测试执行异常抛出异常
* {"mode":"EXCEPTION"}
*/
EXCEPTION,
/**
* MapReduce需要控制台配置为 MapReduce 执行模式
* {"mode":"MR","batchNum": 10, "batchSize": 20,"subTaskSuccessRate":0.7}
*/
MR,
/**
* 重试后成功JOB 配置 Task 重试次数
* {"mode":"EXCEPTION"}
*/
RETRY
;
public static Mode of(String v) {
for (Mode m : values()) {
if (m.name().equalsIgnoreCase(v)) {
return m;
}
}
return Mode.BASE;
}
}
@Data
public static class VerificationParam implements Serializable {
/**
* 验证模式
*/
private String mode;
/**
* 休眠时间用于验证超时
*/
private Long sleepMs;
/**
* MR批次大小用于验证 MapReduce
*/
private Integer batchSize;
/**
* MRbatchNum
*/
private Integer batchNum;
/**
* MR子任务成功率
*/
private Double subTaskSuccessRate;
private Integer responseSize;
}
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class TestSubTask {
private String taskName;
private int id;
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -21,8 +21,8 @@
<logback.version>1.2.9</logback.version>
<springboot.version>2.7.4</springboot.version>
<powerjob-remote-impl-http.version>4.3.3</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.3</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.4</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.4</powerjob-remote-impl-akka.version>
<gatling.version>3.9.0</gatling.version>
<gatling-maven-plugin.version>4.2.9</gatling-maven-plugin.version>

View File

@ -5,11 +5,11 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>4.3.3</version>
<version>4.3.4</version>
<artifactId>powerjob-remote-framework</artifactId>
<properties>
@ -17,7 +17,7 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-common.version>4.3.3</powerjob-common.version>
<powerjob-common.version>4.3.4</powerjob-common.version>
<reflections.version>0.10.2</reflections.version>

View File

@ -20,7 +20,7 @@ public class Address implements Serializable {
private int port;
public String toFullAddress() {
return String.format("%s:%d", host, port);
return toFullAddress(host, port);
}
public static Address fromIpv4(String ipv4) {
@ -30,6 +30,10 @@ public class Address implements Serializable {
.setPort(Integer.parseInt(split[1]));
}
public static String toFullAddress(String host, int port) {
return String.format("%s:%d", host, port);
}
@Override
public String toString() {
return toFullAddress();

View File

@ -5,19 +5,19 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-remote-framework.version>4.3.3</powerjob-remote-framework.version>
<powerjob-remote-framework.version>4.3.4</powerjob-remote-framework.version>
<akka.version>2.6.13</akka.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-http</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -18,7 +18,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<vertx.version>4.3.7</vertx.version>
<powerjob-remote-framework.version>4.3.3</powerjob-remote-framework.version>
<powerjob-remote-framework.version>4.3.4</powerjob-remote-framework.version>
</properties>
<dependencies>

View File

@ -10,6 +10,8 @@ import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
@ -25,6 +27,7 @@ import java.util.concurrent.CompletionStage;
* @author tjq
* @since 2023/1/1
*/
@Slf4j
public class VertxTransporter implements Transporter {
private final HttpClient httpClient;
@ -90,6 +93,8 @@ public class VertxTransporter implements Transporter {
return Future.succeededFuture(x.toJsonObject().mapTo(clz));
});
}).toCompletionStage();
})
.onFailure(t -> log.warn("[VertxTransporter] post to url[{}] failed,msg: {}", url, ExceptionUtils.getMessage(t)))
.toCompletionStage();
}
}

View File

@ -3,8 +3,8 @@ FROM adoptopenjdk:8-jdk-hotspot
# 维护者
MAINTAINER tengjiqi@gmail.com
# 下载并安装 maven
RUN curl -O https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.9.2/binaries/apache-maven-3.9.2-bin.tar.gz
RUN tar -zxvf apache-maven-3.9.2-bin.tar.gz && mv apache-maven-3.9.2 /opt/powerjob-maven && rm -rf apache-maven-3.9.2-bin.tar.gz
RUN curl -O https://mirrors.aliyun.com/apache/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz
RUN tar -zxvf apache-maven-3.9.4-bin.tar.gz && mv apache-maven-3.9.4 /opt/powerjob-maven && rm -rf apache-maven-3.9.4-bin.tar.gz
# 替换 maven 配置文件
RUN rm -rf /opt/powerjob-maven/conf/settings.xml
COPY settings.xml /opt/powerjob-maven/conf/settings.xml

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>pom</packaging>
<modules>
@ -35,6 +35,7 @@
<db2-jdbc.version>11.5.0.0</db2-jdbc.version>
<postgresql.version>42.2.14</postgresql.version>
<h2.db.version>2.1.214</h2.db.version>
<mongodb-driver-sync.version>4.10.2</mongodb-driver-sync.version>
<zip4j.version>2.11.2</zip4j.version>
<jgit.version>5.7.0.202003110725-r</jgit.version>
@ -49,10 +50,12 @@
<groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.1.6</cron-utils.version>
<powerjob-common.version>4.3.3</powerjob-common.version>
<powerjob-remote-impl-http.version>4.3.3</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.3</powerjob-remote-impl-akka.version>
<powerjob-common.version>4.3.4</powerjob-common.version>
<powerjob-remote-impl-http.version>4.3.4</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.4</powerjob-remote-impl-akka.version>
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
<commons-collections4.version>4.4</commons-collections4.version>
</properties>
<dependencyManagement>
@ -97,6 +100,28 @@
<artifactId>powerjob-server-starter</artifactId>
<version>${project.version}</version>
</dependency>
<!-- 存储扩展-MongoDB未使用可移除 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongodb-driver-sync.version}</version>
</dependency>
<!-- 存储扩展-阿里云OSS未使用可移除 -->
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>${aliyun-sdk-oss.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -188,11 +213,6 @@
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -18,4 +18,11 @@
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,95 @@
package tech.powerjob.server.common.spring.condition;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;
import tech.powerjob.common.utils.CollectionUtils;
import java.util.List;
/**
* PropertyAndOneBeanCondition
* 存在多个接口实现时的唯一规则
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
public abstract class PropertyAndOneBeanCondition implements Condition {
/**
* 配置中存在任意一个 Key 即可加载该 Bean空代表不校验
* @return Keys
*/
protected abstract List<String> anyConfigKey();
/**
* Bean 唯一性校验空代表不校验
* @return beanType
*/
protected abstract Class<?> beanType();
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
boolean anyCfgExist = checkAnyConfigExist(context);
log.info("[PropertyAndOneBeanCondition] [{}] check any config exist result with keys={}: {}", thisName(), anyConfigKey(), anyCfgExist);
if (!anyCfgExist) {
return false;
}
Class<?> beanType = beanType();
if (beanType == null) {
return true;
}
boolean exist = checkBeanExist(context);
log.info("[PropertyAndOneBeanCondition] [{}] bean of type[{}] exist check result: {}", thisName(), beanType.getSimpleName(), exist);
if (exist) {
log.info("[PropertyAndOneBeanCondition] [{}] bean of type[{}] already exist, skip load!", thisName(), beanType.getSimpleName());
return false;
}
return true;
}
private boolean checkAnyConfigExist(ConditionContext context) {
Environment environment = context.getEnvironment();
List<String> keys = anyConfigKey();
if (CollectionUtils.isEmpty(keys)) {
return true;
}
// 判断前缀是否符合任意满足即可
for (String key : keys) {
if (StringUtils.isNotEmpty(environment.getProperty(key))) {
return true;
}
}
return false;
}
private boolean checkBeanExist(ConditionContext context) {
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
if (beanFactory == null) {
return false;
}
try {
beanFactory.getBean(beanType());
return true;
} catch (NoSuchBeanDefinitionException ignore) {
return false;
}
}
private String thisName() {
return this.getClass().getSimpleName();
}
}

View File

@ -0,0 +1,7 @@
/**
* Spring 通用能力包
*
* @author tjq
* @since 2023/7/30
*/
package tech.powerjob.server.common.spring;

View File

@ -0,0 +1,42 @@
package tech.powerjob.server.common.utils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* 开发团队专用测试工具
*
* @author tjq
* @since 2023/7/31
*/
public class TestUtils {
private static final String TEST_CONFIG_NAME = "/.powerjob_test";
public static final String KEY_PHONE_NUMBER = "phone";
public static final String KEY_MONGO_URI = "mongoUri";
/**
* 获取本地的测试配置主要用于存放一些密钥
* @return 测试配置
*/
public static Map<String, Object> fetchTestConfig() {
try {
// 后续本地测试密钥相关的内容统一存入 .powerjob_test 方便管理
String content = FileUtils.readFileToString(new File(System.getProperty("user.home").concat(TEST_CONFIG_NAME)), StandardCharsets.UTF_8);
if (StringUtils.isNotEmpty(content)) {
return JSONObject.parseObject(content);
}
} catch (Exception ignore) {
}
return Maps.newHashMap();
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -1,17 +1,19 @@
package tech.powerjob.server.extension.defaultimpl.alarm;
package tech.powerjob.server.core.alarm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.server.extension.alarm.Alarm;
import tech.powerjob.server.extension.alarm.AlarmTarget;
import tech.powerjob.server.extension.alarm.Alarmable;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 报警服务
@ -38,10 +40,10 @@ public class AlarmCenter {
});
}
public void alarmFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
public void alarmFailed(Alarm alarm, List<AlarmTarget> alarmTargets) {
POOL.execute(() -> BEANS.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
alarmable.onFailed(alarm, alarmTargets);
}catch (Exception e) {
log.warn("[AlarmCenter] alarm failed.", e);
}

View File

@ -0,0 +1,35 @@
package tech.powerjob.server.core.alarm;
import org.springframework.beans.BeanUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.server.extension.alarm.AlarmTarget;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* AlarmUtils
*
* @author tjq
* @since 2023/7/31
*/
public class AlarmUtils {
public static List<AlarmTarget> convertUserInfoList2AlarmTargetList(List<UserInfoDO> userInfoDOS) {
if (CollectionUtils.isEmpty(userInfoDOS)) {
return Collections.emptyList();
}
return userInfoDOS.stream().map(AlarmUtils::convertUserInfo2AlarmTarget).collect(Collectors.toList());
}
public static AlarmTarget convertUserInfo2AlarmTarget(UserInfoDO userInfoDO) {
AlarmTarget alarmTarget = new AlarmTarget();
BeanUtils.copyProperties(userInfoDO, alarmTarget);
alarmTarget.setName(userInfoDO.getUsername());
return alarmTarget;
}
}

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
package tech.powerjob.server.core.alarm.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@ -14,9 +14,9 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.alarm.AlarmTarget;
import tech.powerjob.server.extension.alarm.Alarmable;
import tech.powerjob.server.extension.alarm.Alarm;
import javax.annotation.PostConstruct;
import java.util.List;
@ -46,7 +46,7 @@ public class DingTalkAlarmService implements Alarmable {
private static final String EMPTY_TAG = "EMPTY";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
public void onFailed(Alarm alarm, List<AlarmTarget> targetUserList) {
if (dingTalkUtils == null) {
return;
}

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
package tech.powerjob.server.core.alarm.impl;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;

View File

@ -1,10 +1,10 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
package tech.powerjob.server.core.alarm.impl;
import org.springframework.beans.factory.annotation.Value;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.extension.alarm.AlarmTarget;
import tech.powerjob.server.extension.alarm.Alarm;
import tech.powerjob.server.extension.alarm.Alarmable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
@ -36,7 +36,7 @@ public class MailAlarmService implements Alarmable {
private String from;
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
public void onFailed(Alarm alarm, List<AlarmTarget> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return;
}
@ -44,7 +44,7 @@ public class MailAlarmService implements Alarmable {
SimpleMailMessage sm = new SimpleMailMessage();
try {
sm.setFrom(from);
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
sm.setTo(targetUserList.stream().map(AlarmTarget::getEmail).filter(Objects::nonNull).toArray(String[]::new));
sm.setSubject(alarm.fetchTitle());
sm.setText(alarm.fetchContent());

View File

@ -1,11 +1,11 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
package tech.powerjob.server.core.alarm.impl;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.extension.alarm.AlarmTarget;
import tech.powerjob.server.extension.alarm.Alarm;
import tech.powerjob.server.extension.alarm.Alarmable;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.RequestBody;
@ -29,7 +29,7 @@ public class WebHookAlarmService implements Alarmable {
private static final String HTTPS_PROTOCOL_PREFIX = "https://";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
public void onFailed(Alarm alarm, List<AlarmTarget> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList)) {
return;
}

View File

@ -1,6 +1,8 @@
package tech.powerjob.server.extension.defaultimpl.alarm.module;
package tech.powerjob.server.core.alarm.module;
import lombok.Data;
import lombok.experimental.Accessors;
import tech.powerjob.server.extension.alarm.Alarm;
/**
* 任务执行失败告警对象
@ -9,6 +11,7 @@ import lombok.Data;
* @since 2020/4/30
*/
@Data
@Accessors(chain = true)
public class JobInstanceAlarm implements Alarm {
/**
* 应用ID

View File

@ -1,7 +1,8 @@
package tech.powerjob.server.extension.defaultimpl.alarm.module;
package tech.powerjob.server.core.alarm.module;
import tech.powerjob.common.model.PEWorkflowDAG;
import lombok.Data;
import tech.powerjob.server.extension.alarm.Alarm;
/**
* 工作流执行失败告警对象

View File

@ -26,6 +26,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.web.multipart.MultipartFile;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.ImpossibleException;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.model.GitRepoInfo;
import tech.powerjob.common.request.ServerDeployContainerRequest;
@ -40,9 +41,10 @@ import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.transporter.TransportService;
@ -74,7 +76,7 @@ public class ContainerService {
@Resource
private ContainerInfoRepository containerInfoRepository;
@Resource
private GridFsManager gridFsManager;
private DFsService dFsService;
@Resource
private TransportService transportService;
@ -149,6 +151,8 @@ public class ContainerService {
*/
public String uploadContainerJarFile(MultipartFile file) throws IOException {
log.info("[ContainerService] start to uploadContainerJarFile, fileName={},size={}", file.getName(), file.getSize());
String workerDirStr = OmsFileUtils.genTemporaryWorkPath();
String tmpFileStr = workerDirStr + "tmp.jar";
@ -164,8 +168,10 @@ public class ContainerService {
String md5 = OmsFileUtils.md5(tmpFile);
String fileName = genContainerJarName(md5);
// 上传到 mongoDB这兄弟耗时也有点小严重导致这个接口整体比较慢...不过也没必要开线程去处理
gridFsManager.store(tmpFile, GridFsManager.CONTAINER_BUCKET, fileName);
// 上传到 DFS这兄弟耗时也有点小严重导致这个接口整体比较慢...不过也没必要开线程去处理
FileLocation fl = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName);
StoreRequest storeRequest = new StoreRequest().setLocalFile(tmpFile).setFileLocation(fl);
dFsService.store(storeRequest);
// 将文件拷贝到正确的路径
String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName;
@ -175,9 +181,14 @@ public class ContainerService {
}
FileUtils.moveFile(tmpFile, finalFile);
log.info("[ContainerService] uploadContainerJarFile successfully,md5={}", md5);
return md5;
}finally {
} catch (Throwable t) {
log.error("[ContainerService] uploadContainerJarFile failed!", t);
ExceptionUtils.rethrow(t);
throw new ImpossibleException();
} finally {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(workerDir));
}
}
@ -196,9 +207,17 @@ public class ContainerService {
if (localFile.exists()) {
return localFile;
}
if (gridFsManager.available()) {
downloadJarFromGridFS(fileName, localFile);
FileLocation fileLocation = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName);
try {
Optional<FileMeta> fileMetaOpt = dFsService.fetchFileMeta(fileLocation);
if (fileMetaOpt.isPresent()) {
dFsService.download(new DownloadRequest().setFileLocation(fileLocation).setTarget(localFile));
}
} catch (Exception e) {
log.warn("[ContainerService] fetchContainerJarFile from dsf failed, version: {}", version, e);
}
return localFile;
}
@ -404,12 +423,14 @@ public class ContainerService {
String jarFileName = genContainerJarName(container.getVersion());
if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) {
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
gridFsManager.store(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName);
remote.sendText("SYSTEM: upload to GridFS successfully~");
}else {
FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(jarFileName);
Optional<FileMeta> dfsMetaOpt = dFsService.fetchFileMeta(dfsFL);
if (dfsMetaOpt.isPresent()) {
remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore.");
} else {
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
dFsService.store(new StoreRequest().setFileLocation(dfsFL).setLocalFile(jarWithDependency));
remote.sendText("SYSTEM: upload to GridFS successfully~");
}
// 将文件从临时工作目录移动到正式目录
@ -455,13 +476,19 @@ public class ContainerService {
if (targetFile.exists()) {
return;
}
if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, mongoFileName)) {
log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName);
return;
}
try {
FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(mongoFileName);
Optional<FileMeta> dfsMetaOpt = dFsService.fetchFileMeta(dfsFL);
if (!dfsMetaOpt.isPresent()) {
log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName);
return;
}
FileUtils.forceMkdirParent(targetFile);
gridFsManager.download(targetFile, GridFsManager.CONTAINER_BUCKET, mongoFileName);
dFsService.download(new DownloadRequest().setTarget(targetFile).setFileLocation(dfsFL));
}catch (Exception e) {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile));
ExceptionUtils.rethrow(e);

View File

@ -1,43 +1,45 @@
package tech.powerjob.server.core.instance;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.StringPage;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.local.LocalInstanceLogDO;
import tech.powerjob.server.persistence.local.LocalInstanceLogRepository;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.StringPage;
import tech.powerjob.server.persistence.local.LocalInstanceLogDO;
import tech.powerjob.server.persistence.local.LocalInstanceLogRepository;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import javax.annotation.Resource;
import java.io.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -58,7 +60,7 @@ public class InstanceLogService {
private InstanceMetadataService instanceMetadataService;
@Resource
private GridFsManager gridFsManager;
private DFsService dFsService;
/**
* 本地数据库操作bean
*/
@ -90,9 +92,9 @@ public class InstanceLogService {
*/
private static final int MAX_LINE_COUNT = 100;
/**
* 过期时间
* 更新中的日志缓存时间
*/
private static final long EXPIRE_INTERVAL_MS = 60000;
private static final long LOG_CACHE_TIME = 10000;
/**
* 提交日志记录持久化到本地数据库中
@ -214,14 +216,16 @@ public class InstanceLogService {
// 先持久化到本地文件
File stableLogFile = genStableLogFile(instanceId);
// 将文件推送到 MongoDB
if (gridFsManager.available()) {
try {
gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) {
log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
}
FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
try {
dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL));
log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) {
log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
}
}catch (Exception e) {
log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
}
@ -245,7 +249,7 @@ public class InstanceLogService {
return localTransactionTemplate.execute(status -> {
File f = new File(path);
// 如果文件存在且有效则不再重新构建日志文件这个判断也需要放在锁内否则构建到一半的文件会被返回
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) {
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < LOG_CACHE_TIME) {
return f;
}
try {
@ -291,17 +295,14 @@ public class InstanceLogService {
}
}else {
if (!gridFsManager.available()) {
OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f);
return f;
}
// 否则从 mongoDB 拉取数据对应后期查询的情况
if (!gridFsManager.exists(GridFsManager.LOG_BUCKET, genMongoFileName(instanceId))) {
FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
Optional<FileMeta> dflMetaOpt = dFsService.fetchFileMeta(dfl);
if (!dflMetaOpt.isPresent()) {
OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
return f;
}
gridFsManager.download(f, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl));
}
return f;
}catch (Exception e) {

View File

@ -15,10 +15,11 @@ import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.alarm.AlarmUtils;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.extension.defaultimpl.alarm.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alarm.module.JobInstanceAlarm;
import tech.powerjob.server.core.alarm.AlarmCenter;
import tech.powerjob.server.core.alarm.module.JobInstanceAlarm;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
@ -239,7 +240,7 @@ public class InstanceManager implements TransportServiceAware {
if (!StringUtils.isEmpty(alertContent)) {
content.setResult(alertContent);
}
alarmCenter.alarmFailed(content, userList);
alarmCenter.alarmFailed(content, AlarmUtils.convertUserInfoList2AlarmTargetList(userList));
}
@Override

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl;
package tech.powerjob.server.core.lock;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;

View File

@ -13,10 +13,11 @@ import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.extension.dfs.DFsService;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import java.io.File;
@ -32,7 +33,7 @@ import java.util.Date;
@Service
public class CleanService {
private final GridFsManager gridFsManager;
private final DFsService dFsService;
private final InstanceInfoRepository instanceInfoRepository;
@ -57,12 +58,12 @@ public class CleanService {
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
public CleanService(DFsService dFsService, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService,
@Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay,
@Value("${oms.container.retention.local}") int localContainerRetentionDay,
@Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) {
this.gridFsManager = gridFsManager;
this.dFsService = dFsService;
this.instanceInfoRepository = instanceInfoRepository;
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
this.workflowNodeInfoRepository = workflowNodeInfoRepository;
@ -106,8 +107,8 @@ public class CleanService {
// 删除无用节点
cleanWorkflowNodeInfo();
// 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
cleanRemote(Constants.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(Constants.CONTAINER_BUCKET, remoteContainerRetentionDay);
} finally {
lockService.unlock(HISTORY_DELETE_LOCK);
}
@ -152,15 +153,13 @@ public class CleanService {
log.info("[CleanService] won't clean up bucket({}) because of offset day <= 0.", bucketName);
return;
}
if (gridFsManager.available()) {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
gridFsManager.deleteBefore(bucketName, day);
}catch (Exception e) {
log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e);
}
log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop());
Stopwatch stopwatch = Stopwatch.createStarted();
try {
dFsService.cleanExpiredFiles(bucketName, day);
}catch (Exception e) {
log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e);
}
log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop());
}
@VisibleForTesting

View File

@ -89,8 +89,12 @@ public class JobServiceImpl implements JobService {
fillDefaultValue(jobInfoDO);
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds()));
if (request.getNotifyUserIds() != null) {
if (request.getNotifyUserIds().size() == 0) {
jobInfoDO.setNotifyUserIds(null);
} else {
jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds()));
}
}
LifeCycle lifecycle = Optional.ofNullable(request.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle));

View File

@ -20,14 +20,15 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.alarm.AlarmUtils;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
import tech.powerjob.server.core.uid.IdGenerateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.extension.defaultimpl.alarm.AlarmCenter;
import tech.powerjob.server.extension.defaultimpl.alarm.module.WorkflowInstanceAlarm;
import tech.powerjob.server.core.alarm.AlarmCenter;
import tech.powerjob.server.core.alarm.module.WorkflowInstanceAlarm;
import tech.powerjob.server.persistence.remote.model.*;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
@ -458,7 +459,7 @@ public class WorkflowInstanceManager {
content.setResult(result);
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
alarmCenter.alarmFailed(content, userList);
alarmCenter.alarmFailed(content, AlarmUtils.convertUserInfoList2AlarmTargetList(userList));
});
} catch (Exception ignore) {
// ignore

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -19,10 +19,6 @@
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-persistence</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,17 +0,0 @@
package tech.powerjob.server.extension;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import java.util.List;
/**
* 报警接口
*
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.alarm.module;
package tech.powerjob.server.extension.alarm;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;

View File

@ -0,0 +1,39 @@
package tech.powerjob.server.extension.alarm;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
/**
* 报警目标
*
* @author tjq
* @since 2023/7/16
*/
@Data
@Accessors(chain = true)
public class AlarmTarget implements Serializable {
private String name;
/**
* 手机号
*/
private String phone;
/**
* 邮箱地址
*/
private String email;
/**
* webHook
*/
private String webHook;
/**
* 扩展字段
*/
private String extra;
private Map<String, Objects> attributes;
}

View File

@ -0,0 +1,14 @@
package tech.powerjob.server.extension.alarm;
import java.util.List;
/**
* 报警接口
*
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
void onFailed(Alarm alarm, List<AlarmTarget> alarmTargets);
}

View File

@ -0,0 +1,44 @@
package tech.powerjob.server.extension.dfs;
import java.io.IOException;
import java.util.Optional;
/**
* 分布式文件服务
*
* @author tjq
* @since 2023/7/16
*/
public interface DFsService {
/**
* 存储文件
* @param storeRequest 存储请求
* @throws IOException 异常
*/
void store(StoreRequest storeRequest) throws IOException;
/**
* 下载文件
* @param downloadRequest 文件下载请求
* @throws IOException 异常
*/
void download(DownloadRequest downloadRequest) throws IOException;
/**
* 获取文件元信息
* @param fileLocation 文件位置
* @return 存在则返回文件元信息
* @throws IOException 异常
*/
Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException;
/**
* 清理 powerjob 认为过期的文件
* 部分存储系统自带生命周期管理如阿里云OSS则不需要单独实现该方法
* @param bucket bucket
* @param days 天数需要清理超过 X 天的文件
*/
default void cleanExpiredFiles(String bucket, int days) {
}
}

View File

@ -0,0 +1,22 @@
package tech.powerjob.server.extension.dfs;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.File;
import java.io.Serializable;
/**
* download request
*
* @author tjq
* @since 2023/7/16
*/
@Data
@Accessors(chain = true)
public class DownloadRequest implements Serializable {
private transient File target;
private FileLocation fileLocation;
}

View File

@ -0,0 +1,32 @@
package tech.powerjob.server.extension.dfs;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
/**
* 文件路径
*
* @author tjq
* @since 2023/7/16
*/
@Getter
@Setter
@Accessors(chain = true)
public class FileLocation {
/**
* 存储桶
*/
private String bucket;
/**
* 名称
*/
private String name;
@Override
public String toString() {
return String.format("%s.%s", bucket, name);
}
}

View File

@ -0,0 +1,32 @@
package tech.powerjob.server.extension.dfs;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
import java.util.Map;
/**
* FileMeta
*
* @author tjq
* @since 2023/7/16
*/
@Data
@Accessors(chain = true)
public class FileMeta {
/**
* 文件大小
*/
private long length;
/**
* 最后修改时间
*/
private Date lastModifiedTime;
/**
* 元数据
*/
private Map<String, Object> metaInfo;
}

View File

@ -0,0 +1,22 @@
package tech.powerjob.server.extension.dfs;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.File;
import java.io.Serializable;
/**
* StoreRequest
*
* @author tjq
* @since 2023/7/16
*/
@Data
@Accessors(chain = true)
public class StoreRequest implements Serializable {
private transient File localFile;
private FileLocation fileLocation;
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -23,10 +23,23 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-common</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-extension</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-monitor</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,153 +0,0 @@
package tech.powerjob.server.persistence.mongodb;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Filters;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import java.io.*;
import java.util.Date;
import java.util.Map;
import java.util.function.Consumer;
/**
* GridFS 操作助手
*
* @author tjq
* @since 2020/5/18
*/
@Slf4j
@Service
public class GridFsManager implements InitializingBean {
private final Environment environment;
private final MongoDatabase db;
private boolean available;
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) {
this.environment = environment;
if (mongoTemplate != null) {
this.db = mongoTemplate.getDb();
} else {
this.db = null;
}
}
/**
* 是否可用
* @return true可用false不可用
*/
public boolean available() {
return available;
}
/**
* 存储文件到 GridFS
* @param localFile 本地文件
* @param bucketName 桶名称
* @param fileName GirdFS中的文件名称
* @throws IOException 异常
*/
public void store(File localFile, String bucketName, String fileName) throws IOException {
if (available()) {
GridFSBucket bucket = getBucket(bucketName);
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) {
bucket.uploadFromStream(fileName, bis);
}
}
}
/**
* GridFS 下载文件
* @param targetFile 下载的目标文件本地文件
* @param bucketName 桶名称
* @param fileName GirdFS中的文件名称
* @throws IOException 异常
*/
public void download(File targetFile, String bucketName, String fileName) throws IOException {
if (available()) {
GridFSBucket bucket = getBucket(bucketName);
try (GridFSDownloadStream gis = bucket.openDownloadStream(fileName);
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))
) {
byte[] buffer = new byte[1024];
int bytes = 0;
while ((bytes = gis.read(buffer)) != -1) {
bos.write(buffer, 0, bytes);
}
bos.flush();
}
}
}
/**
* 删除几天前的文件
* @param bucketName 桶名称
* @param day 日期偏移量单位
*/
public void deleteBefore(String bucketName, int day) {
Stopwatch sw = Stopwatch.createStarted();
Date date = DateUtils.addDays(new Date(), -day);
GridFSBucket bucket = getBucket(bucketName);
Bson filter = Filters.lt("uploadDate", date);
// 循环删除性能很差我猜你肯定没看过官方实现[狗头]org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
bucket.find(filter).forEach((Consumer<GridFSFile>) gridFSFile -> {
ObjectId objectId = gridFSFile.getObjectId();
try {
bucket.delete(objectId);
log.info("[GridFsManager] deleted {}#{}", bucketName, objectId);
}catch (Exception e) {
log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e);
}
});
log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
}
public boolean exists(String bucketName, String fileName) {
GridFSBucket bucket = getBucket(bucketName);
GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName));
try {
GridFSFile first = files.first();
return first != null;
}catch (Exception ignore) {
}
return false;
}
private GridFSBucket getBucket(String bucketName) {
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
}
@Override
public void afterPropertiesSet() throws Exception {
String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString());
available = Boolean.TRUE.toString().equals(enable) && db != null;
log.info("[GridFsManager] available: {}, db: {}", available, db);
}
}

View File

@ -0,0 +1,41 @@
package tech.powerjob.server.persistence.storage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import tech.powerjob.server.extension.dfs.DFsService;
/**
* AbstractDFsService
*
* @author tjq
* @since 2023/7/28
*/
@Slf4j
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
protected ApplicationContext applicationContext;
public AbstractDFsService() {
log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
}
abstract protected void init(ApplicationContext applicationContext);
protected static final String PROPERTY_KEY = "oms.storage.dfs";
protected static String fetchProperty(Environment environment, String dfsType, String key) {
String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
return environment.getProperty(pKey);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
init(applicationContext);
}
}

View File

@ -0,0 +1,15 @@
package tech.powerjob.server.persistence.storage;
/**
* Constants
*
* @author tjq
* @since 2023/7/30
*/
public class Constants {
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
}

View File

@ -0,0 +1,44 @@
package tech.powerjob.server.persistence.storage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import tech.powerjob.server.extension.dfs.DFsService;
import tech.powerjob.server.persistence.storage.impl.AliOssService;
import tech.powerjob.server.persistence.storage.impl.EmptyDFsService;
import tech.powerjob.server.persistence.storage.impl.GridFsService;
import tech.powerjob.server.persistence.storage.impl.MySqlSeriesDfsService;
/**
* 初始化内置的存储服务
*
* @author tjq
* @since 2023/7/30
*/
@Configuration
public class StorageConfiguration {
@Bean
@Conditional(GridFsService.GridFsCondition.class)
public DFsService initGridFs() {
return new GridFsService();
}
@Bean
@Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class)
public DFsService initDbFs() {
return new MySqlSeriesDfsService();
}
@Bean
@Conditional(AliOssService.AliOssCondition.class)
public DFsService initAliOssFs() {
return new AliOssService();
}
@Bean
@Conditional(EmptyDFsService.EmptyCondition.class)
public DFsService initEmptyDfs() {
return new EmptyDFsService();
}
}

View File

@ -0,0 +1,229 @@
package tech.powerjob.server.persistence.storage.impl;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
import com.aliyun.oss.model.DownloadFileRequest;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PutObjectRequest;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.Environment;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import javax.annotation.Priority;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Alibaba OSS support
* <a href="https://www.aliyun.com/product/oss">海量安全低成本高可靠的云存储服务</a>
* 配置项
* oms.storage.dfs.alioss.endpoint
* oms.storage.dfs.alioss.bucket
* oms.storage.dfs.alioss.credential_type
* oms.storage.dfs.alioss.ak
* oms.storage.dfs.alioss.sk
* oms.storage.dfs.alioss.token
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
@Priority(value = Integer.MAX_VALUE - 1)
@Conditional(AliOssService.AliOssCondition.class)
public class AliOssService extends AbstractDFsService {
private static final String TYPE_ALI_OSS = "alioss";
private static final String KEY_ENDPOINT = "endpoint";
private static final String KEY_BUCKET = "bucket";
private static final String KEY_CREDENTIAL_TYPE = "credential_type";
private static final String KEY_AK = "ak";
private static final String KEY_SK = "sk";
private static final String KEY_TOKEN = "token";
private OSS oss;
private String bucket;
private static final int DOWNLOAD_PART_SIZE = 10240;
private static final String NO_SUCH_KEY = "NoSuchKey";
@Override
public void store(StoreRequest storeRequest) throws IOException {
ObjectMetadata objectMetadata = new ObjectMetadata();
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);
oss.putObject(putObjectRequest);
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
FileLocation dfl = downloadRequest.getFileLocation();
DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
try {
FileUtils.forceMkdirParent(downloadRequest.getTarget());
oss.downloadFile(downloadFileRequest);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
try {
ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));
return Optional.ofNullable(objectMetadata).map(ossM -> {
Map<String, Object> metaInfo = Maps.newHashMap();
metaInfo.putAll(ossM.getRawMetadata());
if (ossM.getUserMetadata() != null) {
metaInfo.putAll(ossM.getUserMetadata());
}
return new FileMeta()
.setLastModifiedTime(ossM.getLastModified())
.setLength(ossM.getContentLength())
.setMetaInfo(metaInfo);
});
} catch (OSSException oe) {
String errorCode = oe.getErrorCode();
if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
return Optional.empty();
}
ExceptionUtils.rethrow(oe);
}
return Optional.empty();
}
private static String parseFileName(FileLocation fileLocation) {
return String.format("%s/%s", fileLocation.getBucket(), fileLocation.getName());
}
void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {
log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);
if (StringUtils.isEmpty(bucket)) {
throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");
}
this.bucket = bucket;
CredentialsProvider credentialsProvider;
CredentialType credentialType = CredentialType.parse(mode);
switch (credentialType) {
case PWD:
credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
break;
case SYSTEM_PROPERTY:
credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
break;
default:
credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
}
this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
log.info("[AliOssService] initialize OSS successfully!");
}
@Override
public void cleanExpiredFiles(String bucket, int days) {
/*
阿里云 OSS 自带生命周期管理请参考文档进行配置代码层面不进行实现浪费服务器资源https://help.aliyun.com/zh/oss/user-guide/overview-54
阿里云 OSS 自带生命周期管理请参考文档进行配置代码层面不进行实现浪费服务器资源https://help.aliyun.com/zh/oss/user-guide/overview-54
阿里云 OSS 自带生命周期管理请参考文档进行配置代码层面不进行实现浪费服务器资源https://help.aliyun.com/zh/oss/user-guide/overview-54
*/
}
@Override
public void destroy() throws Exception {
oss.shutdown();
}
@Override
protected void init(ApplicationContext applicationContext) {
Environment environment = applicationContext.getEnvironment();
String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
try {
initOssClient(endpoint, bkt, ct, ak, sk, token);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
@Getter
@AllArgsConstructor
enum CredentialType {
/**
* 从环境读取
*/
ENV("env"),
/**
* 系统配置
*/
SYSTEM_PROPERTY("sys"),
/**
* 从账号密码读取
*/
PWD("pwd")
;
private final String code;
/**
* parse credential type
* @param mode oms.storage.dfs.alioss.credential_type
* @return CredentialType
*/
public static CredentialType parse(String mode) {
for (CredentialType credentialType : values()) {
if (StringUtils.equalsIgnoreCase(credentialType.code, mode)) {
return credentialType;
}
}
return PWD;
}
}
public static class AliOssCondition extends PropertyAndOneBeanCondition {
@Override
protected List<String> anyConfigKey() {
return Lists.newArrayList("oms.storage.dfs.alioss.endpoint");
}
@Override
protected Class<?> beanType() {
return DFsService.class;
}
}
}

View File

@ -0,0 +1,59 @@
package tech.powerjob.server.persistence.storage.impl;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import javax.annotation.Priority;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
/**
* EmptyDFsService
*
* @author tjq
* @since 2023/7/30
*/
@Priority(value = Integer.MAX_VALUE)
@Conditional(EmptyDFsService.EmptyCondition.class)
public class EmptyDFsService extends AbstractDFsService {
@Override
public void store(StoreRequest storeRequest) throws IOException {
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
return Optional.empty();
}
@Override
public void destroy() throws Exception {
}
@Override
protected void init(ApplicationContext applicationContext) {
}
public static class EmptyCondition extends PropertyAndOneBeanCondition {
@Override
protected List<String> anyConfigKey() {
return null;
}
@Override
protected Class<?> beanType() {
return DFsService.class;
}
}
}

View File

@ -0,0 +1,174 @@
package tech.powerjob.server.persistence.storage.impl;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Filters;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.Environment;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import javax.annotation.Priority;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* 使用 MongoDB GridFS 作为底层存储
* 配置用法oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
*
* @author tjq
* @since 2023/7/28
*/
@Slf4j
@Priority(value = Integer.MAX_VALUE - 10)
@Conditional(GridFsService.GridFsCondition.class)
public class GridFsService extends AbstractDFsService {
private MongoClient mongoClient;
private MongoDatabase db;
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
private static final String TYPE_MONGO = "mongodb";
private static final String KEY_URI = "uri";
private static final String SPRING_MONGO_DB_CONFIG_KEY = "spring.data.mongodb.uri";
@Override
public void store(StoreRequest storeRequest) throws IOException {
GridFSBucket bucket = getBucket(storeRequest.getFileLocation().getBucket());
try (BufferedInputStream bis = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))) {
bucket.uploadFromStream(storeRequest.getFileLocation().getName(), bis);
}
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
GridFSBucket bucket = getBucket(downloadRequest.getFileLocation().getBucket());
FileUtils.forceMkdirParent(downloadRequest.getTarget());
try (GridFSDownloadStream gis = bucket.openDownloadStream(downloadRequest.getFileLocation().getName());
BufferedOutputStream bos = new BufferedOutputStream(Files.newOutputStream(downloadRequest.getTarget().toPath()))
) {
byte[] buffer = new byte[1024];
int bytes = 0;
while ((bytes = gis.read(buffer)) != -1) {
bos.write(buffer, 0, bytes);
}
bos.flush();
}
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
GridFSBucket bucket = getBucket(fileLocation.getBucket());
GridFSFindIterable files = bucket.find(Filters.eq("filename", fileLocation.getName()));
GridFSFile first = files.first();
if (first == null) {
return Optional.empty();
}
return Optional.of(new FileMeta()
.setLength(first.getLength())
.setLastModifiedTime(first.getUploadDate())
.setMetaInfo(first.getMetadata()));
}
@Override
public void cleanExpiredFiles(String bucketName, int days) {
Stopwatch sw = Stopwatch.createStarted();
Date date = DateUtils.addDays(new Date(), -days);
GridFSBucket bucket = getBucket(bucketName);
Bson filter = Filters.lt("uploadDate", date);
// 循环删除性能很差我猜你肯定没看过官方实现[狗头]org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
bucket.find(filter).forEach(gridFSFile -> {
ObjectId objectId = gridFSFile.getObjectId();
try {
bucket.delete(objectId);
log.info("[GridFsService] deleted {}#{}", bucketName, objectId);
}catch (Exception e) {
log.error("[GridFsService] deleted {}#{} failed.", bucketName, objectId, e);
}
});
log.info("[GridFsService] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
}
private GridFSBucket getBucket(String bucketName) {
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
}
private String parseMongoUri(Environment environment) {
// 优先从新的规则读取
String uri = fetchProperty(environment, TYPE_MONGO, KEY_URI);
if (StringUtils.isNotEmpty(uri)) {
return uri;
}
// 兼容 4.3.3 前的逻辑读取 SpringMongoDB 配置
return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
}
void initMongo(String uri) {
log.info("[GridFsService] mongoDB uri: {}", uri);
if (StringUtils.isEmpty(uri)) {
log.warn("[GridFsService] uri is empty, GridFsService is off now!");
return;
}
ConnectionString connectionString = new ConnectionString(uri);
mongoClient = MongoClients.create(connectionString);
if (StringUtils.isEmpty(connectionString.getDatabase())) {
log.warn("[GridFsService] can't find database info from uri, will use [powerjob] as default, please make sure you have created the database 'powerjob'");
}
db = mongoClient.getDatabase(Optional.ofNullable(connectionString.getDatabase()).orElse("powerjob"));
log.info("[GridFsService] initialize MongoDB and GridFS successfully, will use mongodb GridFs as storage layer.");
}
@Override
public void destroy() throws Exception {
mongoClient.close();
}
@Override
protected void init(ApplicationContext applicationContext) {
String uri = parseMongoUri(applicationContext.getEnvironment());
initMongo(uri);
}
public static class GridFsCondition extends PropertyAndOneBeanCondition {
@Override
protected List<String> anyConfigKey() {
return Lists.newArrayList("spring.data.mongodb.uri", "oms.storage.dfs.mongodb.uri");
}
@Override
protected Class<?> beanType() {
return DFsService.class;
}
}
}

View File

@ -0,0 +1,344 @@
package tech.powerjob.server.persistence.storage.impl;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.Environment;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import javax.annotation.Priority;
import javax.sql.DataSource;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.sql.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* MySQL 特性类似的数据库存储
* PS1. 大文件上传可能会报 max_allowed_packet 不足可根据参数放开数据库限制 set global max_allowed_packet = 500*1024*1024
* PS1. 官方基于 MySQL 测试其他数据库使用前请自测敬请谅解
* PS2. 数据库并不适合大规模的文件存储该扩展仅适用于简单业务大型业务场景请选择其他存储方案OSSMongoDB等
* ********************* 配置项 *********************
* oms.storage.dfs.mysql_series.driver
* oms.storage.dfs.mysql_series.url
* oms.storage.dfs.mysql_series.username
* oms.storage.dfs.mysql_series.password
* oms.storage.dfs.mysql_series.auto_create_table
* oms.storage.dfs.mysql_series.table_name
*
* @author tjq
* @since 2023/8/9
*/
@Slf4j
@Priority(value = Integer.MAX_VALUE - 2)
@Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class)
public class MySqlSeriesDfsService extends AbstractDFsService {
private DataSource dataSource;
private static final String TYPE_MYSQL = "mysql_series";
/**
* 数据库驱动MYSQL8 com.mysql.cj.jdbc.Driver
*/
private static final String KEY_DRIVER_NAME = "driver";
/**
* 数据库地址比如 jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
*/
private static final String KEY_URL = "url";
/**
* 数据库账号比如 root
*/
private static final String KEY_USERNAME = "username";
/**
* 数据库密码
*/
private static final String KEY_PASSWORD = "password";
/**
* 是否自动建表
*/
private static final String KEY_AUTO_CREATE_TABLE = "auto_create_table";
/**
* 表名
*/
private static final String KEY_TABLE_NAME = "table_name";
/* ********************* SQL region ********************* */
private static final String DEFAULT_TABLE_NAME = "powerjob_files";
private static final String CREATE_TABLE_SQL = "CREATE TABLE\n" +
"IF\n" +
"\tNOT EXISTS %s (\n" +
"\t\t`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT 'ID',\n" +
"\t\t`bucket` VARCHAR ( 255 ) NOT NULL COMMENT '分桶',\n" +
"\t\t`name` VARCHAR ( 255 ) NOT NULL COMMENT '文件名称',\n" +
"\t\t`version` VARCHAR ( 255 ) NOT NULL COMMENT '版本',\n" +
"\t\t`meta` VARCHAR ( 255 ) COMMENT '元数据',\n" +
"\t\t`length` BIGINT NOT NULL COMMENT '长度',\n" +
"\t\t`status` INT NOT NULL COMMENT '状态',\n" +
"\t\t`data` LONGBLOB NOT NULL COMMENT '文件内容',\n" +
"\t\t`extra` VARCHAR ( 255 ) COMMENT '其他信息',\n" +
"\t\t`gmt_create` DATETIME NOT NULL COMMENT '创建时间',\n" +
"\t\t`gmt_modified` DATETIME COMMENT '更新时间',\n" +
"\tPRIMARY KEY ( id ) \n" +
"\t);";
private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);";
private static final String DELETE_SQL = "DELETE FROM %s ";
private static final String QUERY_FULL_SQL = "select * from %s";
private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s";
private void deleteByLocation(FileLocation fileLocation) {
String dSQLPrefix = fullSQL(DELETE_SQL);
String dSQL = dSQLPrefix.concat(whereSQL(fileLocation));
executeDelete(dSQL);
}
private void executeDelete(String sql) {
try (Connection con = dataSource.getConnection()) {
con.createStatement().executeUpdate(sql);
} catch (Exception e) {
log.error("[MySqlSeriesDfsService] executeDelete failed, sql: {}", sql);
}
}
@Override
public void store(StoreRequest storeRequest) throws IOException {
Stopwatch sw = Stopwatch.createStarted();
String insertSQL = fullSQL(INSERT_SQL);
FileLocation fileLocation = storeRequest.getFileLocation();
// 覆盖写写之前先删除
deleteByLocation(fileLocation);
Map<String, Object> meta = Maps.newHashMap();
meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath());
Date date = new Date(System.currentTimeMillis());
try (Connection con = dataSource.getConnection()) {
PreparedStatement pst = con.prepareStatement(insertSQL);
pst.setString(1, fileLocation.getBucket());
pst.setString(2, fileLocation.getName());
pst.setString(3, "mu");
pst.setString(4, JsonUtils.toJSONString(meta));
pst.setLong(5, storeRequest.getLocalFile().length());
pst.setInt(6, SwitchableStatus.ENABLE.getV());
pst.setBlob(7, new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())));
pst.setString(8, null);
pst.setDate(9, date);
pst.setDate(10, date);
pst.execute();
log.info("[MySqlSeriesDfsService] store [{}] successfully, cost: {}", fileLocation, sw);
} catch (Exception e) {
log.error("[MySqlSeriesDfsService] store [{}] failed!", fileLocation);
ExceptionUtils.rethrow(e);
}
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
Stopwatch sw = Stopwatch.createStarted();
String querySQL = fullSQL(QUERY_FULL_SQL);
FileLocation fileLocation = downloadRequest.getFileLocation();
FileUtils.forceMkdirParent(downloadRequest.getTarget());
try (Connection con = dataSource.getConnection()) {
ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation)));
boolean exist = resultSet.next();
if (!exist) {
log.warn("[MySqlSeriesDfsService] download file[{}] failed due to not exits!", fileLocation);
return;
}
Blob dataBlob = resultSet.getBlob("data");
FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget());
log.info("[MySqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw);
} catch (Exception e) {
log.error("[MySqlSeriesDfsService] download file [{}] failed!", fileLocation, e);
ExceptionUtils.rethrow(e);
}
}
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
String querySQL = fullSQL(QUERY_META_SQL);
try (Connection con = dataSource.getConnection()) {
ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation)));
boolean exist = resultSet.next();
if (!exist) {
return Optional.empty();
}
FileMeta fileMeta = new FileMeta()
.setLength(resultSet.getLong("length"))
.setLastModifiedTime(resultSet.getDate("gmt_modified"))
.setMetaInfo(JsonUtils.parseMap(resultSet.getString("meta")));
return Optional.of(fileMeta);
} catch (Exception e) {
log.error("[MySqlSeriesDfsService] fetchFileMeta [{}] failed!", fileLocation);
ExceptionUtils.rethrow(e);
}
return Optional.empty();
}
@Override
public void cleanExpiredFiles(String bucket, int days) {
// 虽然官方提供了服务端删除的能力依然强烈建议用户直接在数据库层面配置清理事件
String dSQLPrefix = fullSQL(DELETE_SQL);
final long targetTs = DateUtils.addDays(new Date(System.currentTimeMillis()), -days).getTime();
final String targetDeleteTime = CommonUtils.formatTime(targetTs);
log.info("[MySqlSeriesDfsService] start to cleanExpiredFiles, targetDeleteTime: {}", targetDeleteTime);
String fSQL = dSQLPrefix.concat(String.format(" where gmt_modified < '%s'", targetDeleteTime));
log.info("[MySqlSeriesDfsService] cleanExpiredFiles SQL: {}", fSQL);
executeDelete(fSQL);
}
@Override
protected void init(ApplicationContext applicationContext) {
Environment env = applicationContext.getEnvironment();
MySQLProperty mySQLProperty = new MySQLProperty()
.setDriver(fetchProperty(env, TYPE_MYSQL, KEY_DRIVER_NAME))
.setUrl(fetchProperty(env, TYPE_MYSQL, KEY_URL))
.setUsername(fetchProperty(env, TYPE_MYSQL, KEY_USERNAME))
.setPassword(fetchProperty(env, TYPE_MYSQL, KEY_PASSWORD))
.setAutoCreateTable(Boolean.TRUE.toString().equalsIgnoreCase(fetchProperty(env, TYPE_MYSQL, KEY_AUTO_CREATE_TABLE)))
;
try {
initDatabase(mySQLProperty);
initTable(mySQLProperty);
} catch (Exception e) {
log.error("[MySqlSeriesDfsService] init datasource failed!", e);
ExceptionUtils.rethrow(e);
}
}
void initDatabase(MySQLProperty property) {
log.info("[MySqlSeriesDfsService] init datasource by config: {}", property);
HikariConfig config = new HikariConfig();
config.setDriverClassName(property.driver);
config.setJdbcUrl(property.url);
config.setUsername(property.username);
config.setPassword(property.password);
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(2);
// 池中最大连接数量
config.setMaximumPoolSize(32);
dataSource = new HikariDataSource(config);
}
void initTable(MySQLProperty property) throws Exception {
if (property.autoCreateTable) {
String createTableSQL = fullSQL(CREATE_TABLE_SQL);
log.info("[MySqlSeriesDfsService] use create table SQL: {}", createTableSQL);
try (Connection connection = dataSource.getConnection()) {
connection.createStatement().execute(createTableSQL);
log.info("[MySqlSeriesDfsService] auto create table successfully!");
}
}
}
private String fullSQL(String sql) {
return String.format(sql, parseTableName());
}
private String parseTableName() {
// 误删兼容本地 unit test
if (applicationContext == null) {
return DEFAULT_TABLE_NAME;
}
String tableName = fetchProperty(applicationContext.getEnvironment(), TYPE_MYSQL, KEY_TABLE_NAME);
return StringUtils.isEmpty(tableName) ? DEFAULT_TABLE_NAME : tableName;
}
private static String whereSQL(FileLocation fileLocation) {
return String.format(" where bucket='%s' AND name='%s' ", fileLocation.getBucket(), fileLocation.getName());
}
@Override
public void destroy() throws Exception {
}
@Data
@Accessors(chain = true)
static class MySQLProperty {
private String driver;
private String url;
private String username;
private String password;
private boolean autoCreateTable;
}
public static class MySqlSeriesCondition extends PropertyAndOneBeanCondition {
@Override
protected List<String> anyConfigKey() {
return Lists.newArrayList("oms.storage.dfs.mysql_series.url");
}
@Override
protected Class<?> beanType() {
return DFsService.class;
}
}
}

View File

@ -0,0 +1,88 @@
package tech.powerjob.server.persistence.storage.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.dfs.*;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
* AbstractDfsServiceTest
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
public abstract class AbstractDfsServiceTest {
private static final String BUCKET = "pj_test";
abstract protected Optional<DFsService> fetchService();
@Test
void testBaseFileOperation() throws Exception {
Optional<DFsService> aliOssServiceOpt = fetchService();
if (!aliOssServiceOpt.isPresent()) {
return;
}
DFsService aliOssService = aliOssServiceOpt.get();
String content = "wlcgyqsl".concat(String.valueOf(ThreadLocalRandom.current().nextLong()));
String temporarySourcePath = OmsFileUtils.genTemporaryWorkPath() + "source.txt";
String temporaryDownloadPath = OmsFileUtils.genTemporaryWorkPath() + "download.txt";
log.info("[testBaseFileOperation] temporarySourcePath: {}", temporarySourcePath);
File sourceFile = new File(temporarySourcePath);
FileUtils.forceMkdirParent(sourceFile);
OmsFileUtils.string2File(content, sourceFile);
FileLocation fileLocation = new FileLocation().setBucket(BUCKET).setName(String.format("test_%d.txt", ThreadLocalRandom.current().nextLong()));
StoreRequest storeRequest = new StoreRequest()
.setFileLocation(fileLocation)
.setLocalFile(sourceFile);
// 存储
aliOssService.store(storeRequest);
// 读取 meta
Optional<FileMeta> metaOpt = aliOssService.fetchFileMeta(fileLocation);
assert metaOpt.isPresent();
log.info("[testBaseFileOperation] file meta: {}", JsonUtils.toJSONString(metaOpt.get()));
// 下载
log.info("[testBaseFileOperation] temporaryDownloadPath: {}", temporaryDownloadPath);
File downloadFile = new File(temporaryDownloadPath);
DownloadRequest downloadRequest = new DownloadRequest()
.setFileLocation(fileLocation)
.setTarget(downloadFile);
aliOssService.download(downloadRequest);
String downloadFileContent = FileUtils.readFileToString(downloadFile, StandardCharsets.UTF_8);
log.info("[testBaseFileOperation] download content: {}", downloadFileContent);
assert downloadFileContent.equals(content);
// 定时清理只是执行不校验
aliOssService.cleanExpiredFiles(BUCKET, 3);
}
@Test
void testFileNotExist() throws Exception {
Optional<DFsService> aliOssServiceOpt = fetchService();
if (!aliOssServiceOpt.isPresent()) {
return;
}
Optional<FileMeta> metaOpt = aliOssServiceOpt.get().fetchFileMeta(new FileLocation().setBucket("tjq").setName("yhz"));
assert !metaOpt.isPresent();
}
}

View File

@ -0,0 +1,49 @@
package tech.powerjob.server.persistence.storage.impl;
import com.aliyun.oss.common.utils.AuthUtils;
import com.aliyun.oss.common.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.server.extension.dfs.DFsService;
import java.util.Optional;
/**
* test AliOSS
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
class AliOssServiceTest extends AbstractDfsServiceTest {
private static final String BUCKET = "power-job";
/**
* 依赖阿里云账号密码测试为了保证单测在其他环境也能通过如果发现不存在配置则直接跳过
* @return AliOssService
*/
@Override
protected Optional<DFsService> fetchService() {
String accessKeyId = StringUtils.trim(System.getenv(AuthUtils.ACCESS_KEY_ENV_VAR));
String secretAccessKey = StringUtils.trim(System.getenv(AuthUtils.SECRET_KEY_ENV_VAR));
String bucket = Optional.ofNullable(System.getenv("POWERJOB_OSS_BUEKCT")).orElse(BUCKET);
log.info("[AliOssServiceTest] ak: {}, sk: {}", accessKeyId, secretAccessKey);
if (org.apache.commons.lang3.StringUtils.isAnyEmpty(accessKeyId, secretAccessKey)) {
return Optional.empty();
}
try {
AliOssService aliOssService = new AliOssService();
aliOssService.initOssClient("oss-cn-beijing.aliyuncs.com", bucket, AliOssService.CredentialType.ENV.getCode(), null, null, null);
return Optional.of(aliOssService);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
return Optional.empty();
}
}

View File

@ -0,0 +1,32 @@
package tech.powerjob.server.persistence.storage.impl;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.server.common.utils.TestUtils;
import tech.powerjob.server.extension.dfs.DFsService;
import java.util.Optional;
/**
* test GridFS
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
class GridFsServiceTest extends AbstractDfsServiceTest {
@Override
protected Optional<DFsService> fetchService() {
Object mongoUri = TestUtils.fetchTestConfig().get(TestUtils.KEY_MONGO_URI);
if (mongoUri == null) {
log.info("[GridFsServiceTest] mongoUri is null, skip load!");
return Optional.empty();
}
GridFsService gridFsService = new GridFsService();
gridFsService.initMongo(String.valueOf(mongoUri));
return Optional.of(gridFsService);
}
}

View File

@ -0,0 +1,42 @@
package tech.powerjob.server.persistence.storage.impl;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.extension.dfs.DFsService;
import java.util.Optional;
/**
* MySqlSeriesDfsServiceTest
*
* @author tjq
* @since 2023/8/10
*/
class MySqlSeriesDfsServiceTest extends AbstractDfsServiceTest {
@Override
protected Optional<DFsService> fetchService() {
boolean dbAvailable = NetUtils.checkIpPortAvailable("127.0.0.1", 3306);
if (dbAvailable) {
MySqlSeriesDfsService mySqlSeriesDfsService = new MySqlSeriesDfsService();
try {
MySqlSeriesDfsService.MySQLProperty mySQLProperty = new MySqlSeriesDfsService.MySQLProperty()
.setDriver("com.mysql.cj.jdbc.Driver")
.setUrl("jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
.setUsername("root")
.setAutoCreateTable(true)
.setPassword("No1Bug2Please3!");
mySqlSeriesDfsService.initDatabase(mySQLProperty);
mySqlSeriesDfsService.initTable(mySQLProperty);
return Optional.of(mySqlSeriesDfsService);
} catch (Exception e) {
e.printStackTrace();
}
}
return Optional.empty();
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -60,9 +60,11 @@ public class ServerElectionService {
final String currentServer = request.getCurrentServer();
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
return currentServer;
if (localProtocolInfoOpt.isPresent()) {
if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());
return currentServer;
}
}
}
return getServer0(request);
@ -110,13 +112,13 @@ public class ServerElectionService {
// 篡位如果本机存在协议则作为Server调度该 worker
final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
if (targetProtocolInfo != null) {
// 注意写入 AppInfoDO#currentServer 的永远是 default 地址仅在返回的时候特殊处理为协议地址
// 注意写入 AppInfoDO#currentServer 的永远是 default 绑定地址仅在返回的时候特殊处理为协议地址
appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return targetProtocolInfo.getAddress();
return targetProtocolInfo.getExternalAddress();
}
}catch (Exception e) {
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
@ -129,10 +131,10 @@ public class ServerElectionService {
/**
* 判断指定server是否存活
* @param serverAddress 需要检测的server地址
* @param serverAddress 需要检测的server地址绑定的内网地址
* @param downServerCache 缓存防止多次发送PING这个QPS其实还蛮爆表的...
* @param protocol 协议用于返回指定的地址
* @return null or address
* @return null or address外部地址
*/
private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {
@ -156,9 +158,10 @@ public class ServerElectionService {
final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);
if (protocolInfo != null) {
downServerCache.remove(serverAddress);
final String protocolAddress = protocolInfo.toJavaObject(ProtocolInfo.class).getAddress();
log.info("[ServerElection] server[{}] is active, it will be the master, final protocol address={}", serverAddress, protocolAddress);
return protocolAddress;
ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);
log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);
// 4.3.3 升级 4.3.4 过程中未升级的 server 还不存在 externalAddress需要使用 address 兼容
return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress());
} else {
log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);
}

View File

@ -3,6 +3,9 @@ package tech.powerjob.server.remote.transporter;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.transporter.Transporter;
/**
@ -20,11 +23,32 @@ public class ProtocolInfo {
private String address;
/**
* 外部地址当存在 NAT 等场景时需要下发该地址到 worker
*/
private String externalAddress;
private transient Transporter transporter;
public ProtocolInfo(String protocol, String address, Transporter transporter) {
/**
* 序列化需要必须存在无参构造方法严禁删除
*/
public ProtocolInfo() {
}
public ProtocolInfo(String protocol, String host, int port, Transporter transporter) {
this.protocol = protocol;
this.address = address;
this.transporter = transporter;
this.address = Address.toFullAddress(host, port);
// 处理外部地址
String externalAddress = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, host);
// 考虑到不同协议 port 理论上不一样server 需要为每个单独的端口配置映射规则为 powerjob.network.external.port.${协议}比如 powerjob.network.external.port.http
String externalPortByProtocolKey = PowerJobDKey.NT_EXTERNAL_PORT.concat(".").concat(protocol.toLowerCase());
// 大部分用户只使用一种协议在此处做兼容处理降低答疑量和提高易用性如果用户有多种协议只有被转发的协议能成功通讯
String externalPort = PropertyUtils.readProperty(externalPortByProtocolKey, PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(port)));
this.externalAddress = Address.toFullAddress(externalAddress, Integer.parseInt(externalPort));
}
}

View File

@ -120,7 +120,7 @@ public class PowerTransportService implements TransportService, InitializingBean
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);
this.engines.add(re);
this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.getHost(), address.getPort(), engineOutput.getTransporter()));
}
@Override

View File

@ -2,12 +2,11 @@ package tech.powerjob.server.remote.worker;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.remote.worker.filter.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.server.redirector.DesignateServer;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension.defaultimpl.workerfilter;
package tech.powerjob.server.remote.worker.filter;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
@ -6,7 +6,6 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.Set;

View File

@ -1,6 +1,5 @@
package tech.powerjob.server.extension.defaultimpl.workerfilter;
package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,7 +1,6 @@
package tech.powerjob.server.extension.defaultimpl.workerfilter;
package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,4 +1,4 @@
package tech.powerjob.server.extension;
package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -0,0 +1,75 @@
package tech.powerjob.server.web.controller;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.server.common.utils.TestUtils;
import tech.powerjob.server.core.alarm.AlarmCenter;
import tech.powerjob.server.core.alarm.module.JobInstanceAlarm;
import tech.powerjob.server.extension.alarm.AlarmTarget;
import javax.annotation.Resource;
import java.util.Map;
/**
* 开发团队专用或者 PRO 用户用来做自检也可以 lol
* 测试某些强依赖运行时环境的组件 Mail 告警等
*
* @author tjq
* @since 2023/7/31
*/
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
@Value("${server.port}")
private int port;
@Resource
private AlarmCenter alarmCenter;
@RequestMapping("/io")
public Map<String, Object> io(@RequestBody Map<String, Object> input) {
log.info("[TestController] input: {}", JsonUtils.toJSONString(input));
return input;
}
@GetMapping("/check")
public void check() {
Map<String, Object> testConfig = TestUtils.fetchTestConfig();
if (CollectionUtils.isEmpty(testConfig)) {
log.info("[TestController] testConfig not exist, skip check!");
return;
}
log.info("[TestController] testConfig: {}", JsonUtils.toJSONString(testConfig));
testAlarmCenter();
}
void testAlarmCenter() {
JobInstanceAlarm jobInstanceAlarm = new JobInstanceAlarm().setAppId(277).setJobId(1).setInstanceId(2)
.setJobName("test-alarm").setJobParams("jobParams").setInstanceParams("instanceParams")
.setExecuteType(1).setFinishedTime(System.currentTimeMillis());
AlarmTarget target = new AlarmTarget().setName("ald").setPhone("208140").setExtra("extra")
.setPhone(MapUtils.getString(TestUtils.fetchTestConfig(), TestUtils.KEY_PHONE_NUMBER))
.setEmail("tjq@zju.edu.cn")
.setWebHook(localUrlPath().concat("/test/io"));
log.info("[TestController] start to testAlarmCenter, target: {}", target);
alarmCenter.alarmFailed(jobInstanceAlarm, Lists.newArrayList(target));
}
private String localUrlPath() {
return String.format("http://127.0.0.1:%d", port);
}
}

View File

@ -11,8 +11,7 @@ spring.datasource.core.minimum-idle=5
####### MongoDB properties(Non-core configuration properties) #######
####### delete mongodb config to disable mongodb #######
#oms.mongodb.enable=true
#spring.data.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
#oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
####### Email properties(Non-core configuration properties) #######
####### Delete the following code to disable the mail #######

View File

@ -11,8 +11,7 @@ spring.datasource.core.minimum-idle=5
####### MongoDB properties(Non-core configuration properties) #######
####### delete mongodb config to disable mongodb #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre
oms.storage.dfs.mongodb.uri=mongodb://remotehost:27017/powerjob-pre
####### Email properties(Non-core configuration properties) #######
####### Delete the following code to disable the mail #######

View File

@ -11,8 +11,7 @@ spring.datasource.core.minimum-idle=5
####### MongoDB properties(Non-core configuration properties) #######
####### delete mongodb config to disable mongodb #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product
oms.storage.dfs.mongodb.uri=mongodb://localhost:27017/powerjob-product
####### Email properties(Non-core configuration properties) #######
####### Delete the following code to disable the mail #######

View File

@ -1,12 +1,9 @@
package tech.powerjob.server.test;
import org.junit.jupiter.api.Disabled;
import tech.powerjob.server.extension.defaultimpl.alarm.impl.DingTalkUtils;
import com.google.common.collect.Lists;
import tech.powerjob.server.core.alarm.impl.DingTalkUtils;
import org.junit.jupiter.api.Test;
import java.util.List;
/**
* 测试钉钉消息工具
*

View File

@ -1,61 +0,0 @@
package tech.powerjob.server.test;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
/**
* GridFS 测试
*
* @author tjq
* @since 2020/5/18
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class GridFsTest {
@Resource
private GridFsManager gridFsManager;
@Test
@Disabled
public void testStore() throws IOException {
/**
File file = new File("/Users/tjq/Desktop/DistributeCompute/oms-template-origin.zip");
gridFsManager.store(file, "test", "test.zip");
**/
}
@Test
@Disabled
public void testDownload() throws IOException {
/**
File file = new File("/Users/tjq/Desktop/tmp/test-download.zip");
gridFsManager.download(file, "test", "test.zip");
**/
}
@Test
@Disabled
public void testDelete() {
/**
gridFsManager.deleteBefore("fs", 0);
**/
}
@Test
@Disabled
public void testExists() {
/**
System.out.println(gridFsManager.exists("test", "test.zip"));
System.out.println(gridFsManager.exists("test", "oms-sql.sql"));
**/
}
}

View File

@ -1,56 +0,0 @@
package tech.powerjob.server.test;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.core.scheduler.CleanService;
import com.mongodb.client.gridfs.model.GridFSFile;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Date;
import java.util.function.Consumer;
/**
* 在线日志测试
*
* @author tjq
* @since 2020/5/11
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Disabled
public class OmsLogTest {
@Resource
private CleanService cleanService;
@Resource
private GridFsTemplate gridFsTemplate;
@Test
public void testLocalLogCleaner() {
cleanService.cleanLocal(OmsFileUtils.genLogDirPath(), 0);
}
@Test
public void testRemoteLogCleaner() {
cleanService.cleanRemote(GridFsManager.LOG_BUCKET, 0);
}
@Test
public void testGridFsQuery() {
Query mongoQuery = Query.query(Criteria.where("uploadDate").gt(new Date()));
gridFsTemplate.find(mongoQuery).forEach(new Consumer<GridFSFile>() {
@Override
public void accept(GridFSFile gridFSFile) {
System.out.println(gridFSFile.getFilename());
}
});
}
}

View File

@ -5,24 +5,24 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>4.3.3</powerjob.worker.version>
<powerjob.worker.version>4.3.4</powerjob.worker.version>
<logback.version>1.2.9</logback.version>
<picocli.version>4.3.2</picocli.version>
<spring.version>5.3.23</spring.version>
<spring.boot.version>2.3.4.RELEASE</spring.boot.version>
<powerjob.official.processors.version>4.3.3</powerjob.official.processors.version>
<powerjob.official.processors.version>4.3.4</powerjob.official.processors.version>
<!-- dependency for dynamic sql processor -->
<mysql.version>8.0.28</mysql.version>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<properties>
<springboot.version>2.7.4</springboot.version>
<powerjob.worker.starter.version>4.3.3</powerjob.worker.starter.version>
<powerjob.worker.starter.version>4.3.4</powerjob.worker.starter.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.official.processors.version>4.3.3</powerjob.official.processors.version>
<powerjob.official.processors.version>4.3.4</powerjob.official.processors.version>
<!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip>

View File

@ -17,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
@ -40,7 +41,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
log.info("taskContext:{}", JsonUtils.toJSONString(context));
// 根据控制台参数获取MR批次及子任务大小
final JSONObject jobParams = JSONObject.parseObject(context.getJobParams());
final JSONObject jobParams = Optional.ofNullable(context.getJobParams()).map(JSONObject::parseObject).orElse(new JSONObject());
Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);

View File

@ -5,16 +5,16 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>4.3.3</powerjob.worker.version>
<powerjob.worker.version>4.3.4</powerjob.worker.version>
<springboot.version>2.7.4</springboot.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.3</version>
<version>4.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>4.3.3</version>
<version>4.3.4</version>
<packaging>jar</packaging>
<properties>
@ -21,10 +21,10 @@
<logback.version>1.2.9</logback.version>
<powerjob-common.version>4.3.3</powerjob-common.version>
<powerjob-remote-framework.version>4.3.3</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.3.3</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.3</powerjob-remote-impl-http.version>
<powerjob-common.version>4.3.4</powerjob-common.version>
<powerjob-remote-framework.version>4.3.4</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.3.4</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.4</powerjob-remote-impl-http.version>
</properties>
<dependencies>

View File

@ -3,12 +3,14 @@ package tech.powerjob.worker;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.engine.EngineConfig;
@ -79,9 +81,13 @@ public class PowerJobWorker {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
// 初始化元数据
String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
workerRuntime.setWorkerAddress(workerAddress);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String localBindIp = NetUtils.getLocalHost();
int localBindPort = config.getPort();
String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);
String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));
log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);
workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort)));
// 初始化 线程池
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
@ -100,7 +106,7 @@ public class PowerJobWorker {
EngineConfig engineConfig = new EngineConfig()
.setType(config.getProtocol().name())
.setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
.setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
EngineOutput engineOutput = remoteEngine.start(engineConfig);
@ -115,7 +121,7 @@ public class PowerJobWorker {
log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
// 初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
OmsLogHandler omsLogHandler = new OmsLogHandler(workerRuntime.getWorkerAddress(), workerRuntime.getTransporter(), serverDiscoveryService);
workerRuntime.setOmsLogHandler(omsLogHandler);
// 初始化存储

View File

@ -73,6 +73,7 @@ public abstract class TaskTracker {
instanceInfo.setThreadConcurrency(req.getThreadConcurrency());
instanceInfo.setTaskRetryNum(req.getTaskRetryNum());
instanceInfo.setLogConfig(req.getLogConfig());
instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS());
// 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) {