diff --git a/SECURITY.md b/SECURITY.md
new file mode 100644
index 00000000..2ada85b7
--- /dev/null
+++ b/SECURITY.md
@@ -0,0 +1,4 @@
+# Security notices relating to PowerJob
+
+Please disclose any security issues or vulnerabilities found through [Tidelift's coordinated disclosure system](https://tidelift.com/security) or to the maintainers privately(tengjiqi@gmail.com).
+
diff --git a/others/dev/publish_docker.sh b/others/dev/publish_docker.sh
index ee295e3d..7b3da88e 100755
--- a/others/dev/publish_docker.sh
+++ b/others/dev/publish_docker.sh
@@ -90,7 +90,7 @@ if [ "$startup" = "y" ] || [ "$startup" = "Y" ]; then
echo "================== 准备启动 powerjob-server =================="
docker run -d \
--name powerjob-server \
- -p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
+ -p 7700:7700 -p 10086:10086 -p 10010:10010 -p 5001:5005 -p 10001:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--spring.profiles.active=pre" \
-e TZ="Asia/Shanghai" \
diff --git a/pom.xml b/pom.xml
index f946cf27..6008d9c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
tech.powerjob
powerjob
- 4.3.3
+ 4.3.4
pom
powerjob
http://www.powerjob.tech
diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index dfacf993..fea0a769 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -5,18 +5,18 @@
powerjob
tech.powerjob
- 4.3.3
+ 4.3.4
4.0.0
powerjob-client
- 4.3.3
+ 4.3.4
jar
5.9.1
1.2.83
- 4.3.3
+ 4.3.4
3.2.4
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index 179357d0..a168f772 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -5,12 +5,12 @@
powerjob
tech.powerjob
- 4.3.3
+ 4.3.4
4.0.0
powerjob-common
- 4.3.3
+ 4.3.4
jar
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
index aa27ea48..92c3a353 100644
--- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
+++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
@@ -5,7 +5,6 @@ import java.net.NetworkInterface;
/**
* 通过 JVM 启动参数传入的配置信息
*
- *
* @author tjq
* @since 2020/8/8
*/
@@ -16,7 +15,15 @@ public class PowerJobDKey {
*/
public static final String PREFERRED_NETWORK_INTERFACE = "powerjob.network.interface.preferred";
+ /**
+ * 绑定地址,一般填写本机网卡地址
+ */
public static final String BIND_LOCAL_ADDRESS = "powerjob.network.local.address";
+ /**
+ * 外部地址,可选,默认与绑定地址相同。当存在 NAT 等场景时可通过单独传递外部地址来实现通讯
+ */
+ public static final String NT_EXTERNAL_ADDRESS = "powerjob.network.external.address";
+ public static final String NT_EXTERNAL_PORT = "powerjob.network.external.port";
/**
* Java regular expressions for network interfaces that will be ignored.
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/exception/ImpossibleException.java b/powerjob-common/src/main/java/tech/powerjob/common/exception/ImpossibleException.java
new file mode 100644
index 00000000..39803498
--- /dev/null
+++ b/powerjob-common/src/main/java/tech/powerjob/common/exception/ImpossibleException.java
@@ -0,0 +1,10 @@
+package tech.powerjob.common.exception;
+
+/**
+ * ImpossibleException
+ *
+ * @author tjq
+ * @since 2023/7/12
+ */
+public class ImpossibleException extends RuntimeException {
+}
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java
index 085da18c..0480357a 100644
--- a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java
+++ b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java
@@ -8,9 +8,12 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import tech.powerjob.common.exception.ImpossibleException;
import tech.powerjob.common.exception.PowerJobException;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
/**
* JSON工具类
@@ -27,6 +30,8 @@ public class JsonUtils {
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
.build();
+ private static final TypeReference
-
- tech.powerjob
- powerjob-server-persistence
-
\ No newline at end of file
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/Alarmable.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/Alarmable.java
deleted file mode 100644
index 0cec4398..00000000
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/Alarmable.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package tech.powerjob.server.extension;
-
-import tech.powerjob.server.persistence.remote.model.UserInfoDO;
-import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
-
-import java.util.List;
-
-/**
- * 报警接口
- *
- * @author tjq
- * @since 2020/4/19
- */
-public interface Alarmable {
-
- void onFailed(Alarm alarm, List targetUserList);
-}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/Alarm.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/Alarm.java
similarity index 94%
rename from powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/Alarm.java
rename to powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/Alarm.java
index 76708113..6d76986b 100644
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/Alarm.java
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/Alarm.java
@@ -1,4 +1,4 @@
-package tech.powerjob.server.extension.defaultimpl.alarm.module;
+package tech.powerjob.server.extension.alarm;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/AlarmTarget.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/AlarmTarget.java
new file mode 100644
index 00000000..982ba398
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/AlarmTarget.java
@@ -0,0 +1,39 @@
+package tech.powerjob.server.extension.alarm;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * 报警目标
+ *
+ * @author tjq
+ * @since 2023/7/16
+ */
+@Data
+@Accessors(chain = true)
+public class AlarmTarget implements Serializable {
+
+ private String name;
+ /**
+ * 手机号
+ */
+ private String phone;
+ /**
+ * 邮箱地址
+ */
+ private String email;
+ /**
+ * webHook
+ */
+ private String webHook;
+ /**
+ * 扩展字段
+ */
+ private String extra;
+
+ private Map attributes;
+}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/Alarmable.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/Alarmable.java
new file mode 100644
index 00000000..7ede922e
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/alarm/Alarmable.java
@@ -0,0 +1,14 @@
+package tech.powerjob.server.extension.alarm;
+
+import java.util.List;
+
+/**
+ * 报警接口
+ *
+ * @author tjq
+ * @since 2020/4/19
+ */
+public interface Alarmable {
+
+ void onFailed(Alarm alarm, List alarmTargets);
+}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java
new file mode 100644
index 00000000..fdb77b6b
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DFsService.java
@@ -0,0 +1,44 @@
+package tech.powerjob.server.extension.dfs;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * 分布式文件服务
+ *
+ * @author tjq
+ * @since 2023/7/16
+ */
+public interface DFsService {
+
+ /**
+ * 存储文件
+ * @param storeRequest 存储请求
+ * @throws IOException 异常
+ */
+ void store(StoreRequest storeRequest) throws IOException;
+
+ /**
+ * 下载文件
+ * @param downloadRequest 文件下载请求
+ * @throws IOException 异常
+ */
+ void download(DownloadRequest downloadRequest) throws IOException;
+
+ /**
+ * 获取文件元信息
+ * @param fileLocation 文件位置
+ * @return 存在则返回文件元信息
+ * @throws IOException 异常
+ */
+ Optional fetchFileMeta(FileLocation fileLocation) throws IOException;
+
+ /**
+ * 清理 powerjob 认为“过期”的文件
+ * 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法)
+ * @param bucket bucket
+ * @param days 天数,需要清理超过 X 天的文件
+ */
+ default void cleanExpiredFiles(String bucket, int days) {
+ }
+}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java
new file mode 100644
index 00000000..e3288f36
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/DownloadRequest.java
@@ -0,0 +1,22 @@
+package tech.powerjob.server.extension.dfs;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.Serializable;
+
+/**
+ * download request
+ *
+ * @author tjq
+ * @since 2023/7/16
+ */
+@Data
+@Accessors(chain = true)
+public class DownloadRequest implements Serializable {
+
+ private transient File target;
+
+ private FileLocation fileLocation;
+}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java
new file mode 100644
index 00000000..0eda207d
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileLocation.java
@@ -0,0 +1,32 @@
+package tech.powerjob.server.extension.dfs;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * 文件路径
+ *
+ * @author tjq
+ * @since 2023/7/16
+ */
+@Getter
+@Setter
+@Accessors(chain = true)
+public class FileLocation {
+
+ /**
+ * 存储桶
+ */
+ private String bucket;
+
+ /**
+ * 名称
+ */
+ private String name;
+
+ @Override
+ public String toString() {
+ return String.format("%s.%s", bucket, name);
+ }
+}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java
new file mode 100644
index 00000000..077701aa
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/FileMeta.java
@@ -0,0 +1,32 @@
+package tech.powerjob.server.extension.dfs;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * FileMeta
+ *
+ * @author tjq
+ * @since 2023/7/16
+ */
+@Data
+@Accessors(chain = true)
+public class FileMeta {
+
+ /**
+ * 文件大小
+ */
+ private long length;
+ /**
+ * 最后修改时间
+ */
+ private Date lastModifiedTime;
+
+ /**
+ * 元数据
+ */
+ private Map metaInfo;
+}
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java
new file mode 100644
index 00000000..1933343c
--- /dev/null
+++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/dfs/StoreRequest.java
@@ -0,0 +1,22 @@
+package tech.powerjob.server.extension.dfs;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.Serializable;
+
+/**
+ * StoreRequest
+ *
+ * @author tjq
+ * @since 2023/7/16
+ */
+@Data
+@Accessors(chain = true)
+public class StoreRequest implements Serializable {
+
+ private transient File localFile;
+
+ private FileLocation fileLocation;
+}
diff --git a/powerjob-server/powerjob-server-migrate/pom.xml b/powerjob-server/powerjob-server-migrate/pom.xml
index 6d245066..f4f54b1c 100644
--- a/powerjob-server/powerjob-server-migrate/pom.xml
+++ b/powerjob-server/powerjob-server-migrate/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 4.3.3
+ 4.3.4
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-monitor/pom.xml b/powerjob-server/powerjob-server-monitor/pom.xml
index 6158c254..ebe7c793 100644
--- a/powerjob-server/powerjob-server-monitor/pom.xml
+++ b/powerjob-server/powerjob-server-monitor/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 4.3.3
+ 4.3.4
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml
index c5494ae9..48b275de 100644
--- a/powerjob-server/powerjob-server-persistence/pom.xml
+++ b/powerjob-server/powerjob-server-persistence/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 4.3.3
+ 4.3.4
../pom.xml
4.0.0
@@ -23,10 +23,23 @@
tech.powerjob
powerjob-server-common
+
+ tech.powerjob
+ powerjob-server-extension
+
tech.powerjob
powerjob-server-monitor
+
+
+ org.mongodb
+ mongodb-driver-sync
+
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+
\ No newline at end of file
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java
deleted file mode 100644
index d275a0fd..00000000
--- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package tech.powerjob.server.persistence.mongodb;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Maps;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.gridfs.GridFSBucket;
-import com.mongodb.client.gridfs.GridFSBuckets;
-import com.mongodb.client.gridfs.GridFSDownloadStream;
-import com.mongodb.client.gridfs.GridFSFindIterable;
-import com.mongodb.client.gridfs.model.GridFSFile;
-import com.mongodb.client.model.Filters;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.time.DateUtils;
-import org.bson.conversions.Bson;
-import org.bson.types.ObjectId;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.env.Environment;
-import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.stereotype.Service;
-import tech.powerjob.server.common.PowerJobServerConfigKey;
-
-import java.io.*;
-import java.util.Date;
-import java.util.Map;
-import java.util.function.Consumer;
-
-/**
- * GridFS 操作助手
- *
- * @author tjq
- * @since 2020/5/18
- */
-@Slf4j
-@Service
-public class GridFsManager implements InitializingBean {
-
- private final Environment environment;
-
- private final MongoDatabase db;
-
- private boolean available;
-
- private final Map bucketCache = Maps.newConcurrentMap();
-
- public static final String LOG_BUCKET = "log";
-
- public static final String CONTAINER_BUCKET = "container";
-
- public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) {
- this.environment = environment;
- if (mongoTemplate != null) {
- this.db = mongoTemplate.getDb();
- } else {
- this.db = null;
- }
- }
-
- /**
- * 是否可用
- * @return true:可用;false:不可用
- */
- public boolean available() {
- return available;
- }
-
- /**
- * 存储文件到 GridFS
- * @param localFile 本地文件
- * @param bucketName 桶名称
- * @param fileName GirdFS中的文件名称
- * @throws IOException 异常
- */
- public void store(File localFile, String bucketName, String fileName) throws IOException {
- if (available()) {
- GridFSBucket bucket = getBucket(bucketName);
- try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) {
- bucket.uploadFromStream(fileName, bis);
- }
- }
- }
-
- /**
- * 从 GridFS 下载文件
- * @param targetFile 下载的目标文件(本地文件)
- * @param bucketName 桶名称
- * @param fileName GirdFS中的文件名称
- * @throws IOException 异常
- */
- public void download(File targetFile, String bucketName, String fileName) throws IOException {
- if (available()) {
- GridFSBucket bucket = getBucket(bucketName);
- try (GridFSDownloadStream gis = bucket.openDownloadStream(fileName);
- BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))
- ) {
- byte[] buffer = new byte[1024];
- int bytes = 0;
- while ((bytes = gis.read(buffer)) != -1) {
- bos.write(buffer, 0, bytes);
- }
- bos.flush();
- }
- }
- }
-
- /**
- * 删除几天前的文件
- * @param bucketName 桶名称
- * @param day 日期偏移量,单位 天
- */
- public void deleteBefore(String bucketName, int day) {
-
- Stopwatch sw = Stopwatch.createStarted();
-
- Date date = DateUtils.addDays(new Date(), -day);
- GridFSBucket bucket = getBucket(bucketName);
- Bson filter = Filters.lt("uploadDate", date);
-
- // 循环删除性能很差?我猜你肯定没看过官方实现[狗头]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
- bucket.find(filter).forEach((Consumer) gridFSFile -> {
- ObjectId objectId = gridFSFile.getObjectId();
- try {
- bucket.delete(objectId);
- log.info("[GridFsManager] deleted {}#{}", bucketName, objectId);
- }catch (Exception e) {
- log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e);
- }
- });
- log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
- }
-
- public boolean exists(String bucketName, String fileName) {
- GridFSBucket bucket = getBucket(bucketName);
- GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName));
- try {
- GridFSFile first = files.first();
- return first != null;
- }catch (Exception ignore) {
- }
- return false;
- }
-
- private GridFSBucket getBucket(String bucketName) {
- return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString());
- available = Boolean.TRUE.toString().equals(enable) && db != null;
- log.info("[GridFsManager] available: {}, db: {}", available, db);
- }
-}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java
new file mode 100644
index 00000000..edb52a8c
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java
@@ -0,0 +1,41 @@
+package tech.powerjob.server.persistence.storage;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.env.Environment;
+import tech.powerjob.server.extension.dfs.DFsService;
+
+/**
+ * AbstractDFsService
+ *
+ * @author tjq
+ * @since 2023/7/28
+ */
+@Slf4j
+public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
+
+ protected ApplicationContext applicationContext;
+
+ public AbstractDFsService() {
+ log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
+ }
+
+ abstract protected void init(ApplicationContext applicationContext);
+
+ protected static final String PROPERTY_KEY = "oms.storage.dfs";
+
+ protected static String fetchProperty(Environment environment, String dfsType, String key) {
+ String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
+ return environment.getProperty(pKey);
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
+ init(applicationContext);
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/Constants.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/Constants.java
new file mode 100644
index 00000000..dd7fcd69
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/Constants.java
@@ -0,0 +1,15 @@
+package tech.powerjob.server.persistence.storage;
+
+/**
+ * Constants
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+public class Constants {
+
+ public static final String LOG_BUCKET = "log";
+
+ public static final String CONTAINER_BUCKET = "container";
+
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java
new file mode 100644
index 00000000..6b5627fe
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/StorageConfiguration.java
@@ -0,0 +1,44 @@
+package tech.powerjob.server.persistence.storage;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.context.annotation.Configuration;
+import tech.powerjob.server.extension.dfs.DFsService;
+import tech.powerjob.server.persistence.storage.impl.AliOssService;
+import tech.powerjob.server.persistence.storage.impl.EmptyDFsService;
+import tech.powerjob.server.persistence.storage.impl.GridFsService;
+import tech.powerjob.server.persistence.storage.impl.MySqlSeriesDfsService;
+
+/**
+ * 初始化内置的存储服务
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+@Configuration
+public class StorageConfiguration {
+
+ @Bean
+ @Conditional(GridFsService.GridFsCondition.class)
+ public DFsService initGridFs() {
+ return new GridFsService();
+ }
+
+ @Bean
+ @Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class)
+ public DFsService initDbFs() {
+ return new MySqlSeriesDfsService();
+ }
+
+ @Bean
+ @Conditional(AliOssService.AliOssCondition.class)
+ public DFsService initAliOssFs() {
+ return new AliOssService();
+ }
+
+ @Bean
+ @Conditional(EmptyDFsService.EmptyCondition.class)
+ public DFsService initEmptyDfs() {
+ return new EmptyDFsService();
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java
new file mode 100644
index 00000000..7ae36243
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/AliOssService.java
@@ -0,0 +1,229 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.CredentialsProviderFactory;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.model.DownloadFileRequest;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.env.Environment;
+import tech.powerjob.server.extension.dfs.*;
+import tech.powerjob.server.persistence.storage.AbstractDFsService;
+import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
+
+import javax.annotation.Priority;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Alibaba OSS support
+ * 海量、安全、低成本、高可靠的云存储服务
+ * 配置项:
+ * oms.storage.dfs.alioss.endpoint
+ * oms.storage.dfs.alioss.bucket
+ * oms.storage.dfs.alioss.credential_type
+ * oms.storage.dfs.alioss.ak
+ * oms.storage.dfs.alioss.sk
+ * oms.storage.dfs.alioss.token
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+@Slf4j
+@Priority(value = Integer.MAX_VALUE - 1)
+@Conditional(AliOssService.AliOssCondition.class)
+public class AliOssService extends AbstractDFsService {
+
+ private static final String TYPE_ALI_OSS = "alioss";
+
+ private static final String KEY_ENDPOINT = "endpoint";
+ private static final String KEY_BUCKET = "bucket";
+ private static final String KEY_CREDENTIAL_TYPE = "credential_type";
+ private static final String KEY_AK = "ak";
+ private static final String KEY_SK = "sk";
+ private static final String KEY_TOKEN = "token";
+
+ private OSS oss;
+ private String bucket;
+
+ private static final int DOWNLOAD_PART_SIZE = 10240;
+
+ private static final String NO_SUCH_KEY = "NoSuchKey";
+
+ @Override
+ public void store(StoreRequest storeRequest) throws IOException {
+
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+
+ PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);
+ oss.putObject(putObjectRequest);
+ }
+
+ @Override
+ public void download(DownloadRequest downloadRequest) throws IOException {
+
+ FileLocation dfl = downloadRequest.getFileLocation();
+ DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
+ try {
+ FileUtils.forceMkdirParent(downloadRequest.getTarget());
+ oss.downloadFile(downloadFileRequest);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ }
+
+ @Override
+ public Optional fetchFileMeta(FileLocation fileLocation) throws IOException {
+ try {
+ ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));
+ return Optional.ofNullable(objectMetadata).map(ossM -> {
+
+ Map metaInfo = Maps.newHashMap();
+ metaInfo.putAll(ossM.getRawMetadata());
+ if (ossM.getUserMetadata() != null) {
+ metaInfo.putAll(ossM.getUserMetadata());
+ }
+
+ return new FileMeta()
+ .setLastModifiedTime(ossM.getLastModified())
+ .setLength(ossM.getContentLength())
+ .setMetaInfo(metaInfo);
+ });
+ } catch (OSSException oe) {
+ String errorCode = oe.getErrorCode();
+ if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
+ return Optional.empty();
+ }
+ ExceptionUtils.rethrow(oe);
+ }
+ return Optional.empty();
+ }
+
+ private static String parseFileName(FileLocation fileLocation) {
+ return String.format("%s/%s", fileLocation.getBucket(), fileLocation.getName());
+ }
+
+ void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {
+
+ log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);
+
+ if (StringUtils.isEmpty(bucket)) {
+ throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");
+ }
+
+ this.bucket = bucket;
+
+ CredentialsProvider credentialsProvider;
+ CredentialType credentialType = CredentialType.parse(mode);
+ switch (credentialType) {
+ case PWD:
+ credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
+ break;
+ case SYSTEM_PROPERTY:
+ credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
+ break;
+ default:
+ credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
+ }
+
+ this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
+ log.info("[AliOssService] initialize OSS successfully!");
+ }
+
+ @Override
+ public void cleanExpiredFiles(String bucket, int days) {
+ /*
+ 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
+ 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
+ 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
+ */
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ oss.shutdown();
+ }
+
+ @Override
+ protected void init(ApplicationContext applicationContext) {
+ Environment environment = applicationContext.getEnvironment();
+
+ String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
+ String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
+ String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
+ String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
+ String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
+ String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
+
+ try {
+ initOssClient(endpoint, bkt, ct, ak, sk, token);
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
+
+ @Getter
+ @AllArgsConstructor
+ enum CredentialType {
+ /**
+ * 从环境读取
+ */
+ ENV("env"),
+ /**
+ * 系统配置
+ */
+ SYSTEM_PROPERTY("sys"),
+ /**
+ * 从账号密码读取
+ */
+ PWD("pwd")
+ ;
+
+ private final String code;
+
+ /**
+ * parse credential type
+ * @param mode oms.storage.dfs.alioss.credential_type
+ * @return CredentialType
+ */
+ public static CredentialType parse(String mode) {
+
+ for (CredentialType credentialType : values()) {
+ if (StringUtils.equalsIgnoreCase(credentialType.code, mode)) {
+ return credentialType;
+ }
+ }
+
+ return PWD;
+ }
+
+ }
+
+ public static class AliOssCondition extends PropertyAndOneBeanCondition {
+
+ @Override
+ protected List anyConfigKey() {
+ return Lists.newArrayList("oms.storage.dfs.alioss.endpoint");
+ }
+
+ @Override
+ protected Class> beanType() {
+ return DFsService.class;
+ }
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/EmptyDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/EmptyDFsService.java
new file mode 100644
index 00000000..9f464898
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/EmptyDFsService.java
@@ -0,0 +1,59 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Conditional;
+import tech.powerjob.server.extension.dfs.*;
+import tech.powerjob.server.persistence.storage.AbstractDFsService;
+import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
+
+import javax.annotation.Priority;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * EmptyDFsService
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+@Priority(value = Integer.MAX_VALUE)
+@Conditional(EmptyDFsService.EmptyCondition.class)
+public class EmptyDFsService extends AbstractDFsService {
+
+
+ @Override
+ public void store(StoreRequest storeRequest) throws IOException {
+ }
+
+ @Override
+ public void download(DownloadRequest downloadRequest) throws IOException {
+ }
+
+ @Override
+ public Optional fetchFileMeta(FileLocation fileLocation) throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ }
+
+ @Override
+ protected void init(ApplicationContext applicationContext) {
+
+ }
+
+
+ public static class EmptyCondition extends PropertyAndOneBeanCondition {
+ @Override
+ protected List anyConfigKey() {
+ return null;
+ }
+
+ @Override
+ protected Class> beanType() {
+ return DFsService.class;
+ }
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java
new file mode 100644
index 00000000..c5c4f70c
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/GridFsService.java
@@ -0,0 +1,174 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSBuckets;
+import com.mongodb.client.gridfs.GridFSDownloadStream;
+import com.mongodb.client.gridfs.GridFSFindIterable;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.client.model.Filters;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateUtils;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.env.Environment;
+import tech.powerjob.server.extension.dfs.*;
+import tech.powerjob.server.persistence.storage.AbstractDFsService;
+import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
+
+import javax.annotation.Priority;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * 使用 MongoDB GridFS 作为底层存储
+ * 配置用法:oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
+ *
+ * @author tjq
+ * @since 2023/7/28
+ */
+@Slf4j
+@Priority(value = Integer.MAX_VALUE - 10)
+@Conditional(GridFsService.GridFsCondition.class)
+public class GridFsService extends AbstractDFsService {
+
+ private MongoClient mongoClient;
+ private MongoDatabase db;
+ private final Map bucketCache = Maps.newConcurrentMap();
+ private static final String TYPE_MONGO = "mongodb";
+
+ private static final String KEY_URI = "uri";
+
+ private static final String SPRING_MONGO_DB_CONFIG_KEY = "spring.data.mongodb.uri";
+
+ @Override
+ public void store(StoreRequest storeRequest) throws IOException {
+ GridFSBucket bucket = getBucket(storeRequest.getFileLocation().getBucket());
+ try (BufferedInputStream bis = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))) {
+ bucket.uploadFromStream(storeRequest.getFileLocation().getName(), bis);
+ }
+ }
+
+ @Override
+ public void download(DownloadRequest downloadRequest) throws IOException {
+ GridFSBucket bucket = getBucket(downloadRequest.getFileLocation().getBucket());
+ FileUtils.forceMkdirParent(downloadRequest.getTarget());
+ try (GridFSDownloadStream gis = bucket.openDownloadStream(downloadRequest.getFileLocation().getName());
+ BufferedOutputStream bos = new BufferedOutputStream(Files.newOutputStream(downloadRequest.getTarget().toPath()))
+ ) {
+ byte[] buffer = new byte[1024];
+ int bytes = 0;
+ while ((bytes = gis.read(buffer)) != -1) {
+ bos.write(buffer, 0, bytes);
+ }
+ bos.flush();
+ }
+ }
+
+ @Override
+ public Optional fetchFileMeta(FileLocation fileLocation) throws IOException {
+ GridFSBucket bucket = getBucket(fileLocation.getBucket());
+ GridFSFindIterable files = bucket.find(Filters.eq("filename", fileLocation.getName()));
+ GridFSFile first = files.first();
+ if (first == null) {
+ return Optional.empty();
+ }
+ return Optional.of(new FileMeta()
+ .setLength(first.getLength())
+ .setLastModifiedTime(first.getUploadDate())
+ .setMetaInfo(first.getMetadata()));
+ }
+
+ @Override
+ public void cleanExpiredFiles(String bucketName, int days) {
+ Stopwatch sw = Stopwatch.createStarted();
+
+ Date date = DateUtils.addDays(new Date(), -days);
+ GridFSBucket bucket = getBucket(bucketName);
+ Bson filter = Filters.lt("uploadDate", date);
+
+ // 循环删除性能很差?我猜你肯定没看过官方实现[狗头]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
+ bucket.find(filter).forEach(gridFSFile -> {
+ ObjectId objectId = gridFSFile.getObjectId();
+ try {
+ bucket.delete(objectId);
+ log.info("[GridFsService] deleted {}#{}", bucketName, objectId);
+ }catch (Exception e) {
+ log.error("[GridFsService] deleted {}#{} failed.", bucketName, objectId, e);
+ }
+ });
+ log.info("[GridFsService] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
+ }
+
+ private GridFSBucket getBucket(String bucketName) {
+ return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
+ }
+
+ private String parseMongoUri(Environment environment) {
+ // 优先从新的规则读取
+ String uri = fetchProperty(environment, TYPE_MONGO, KEY_URI);
+ if (StringUtils.isNotEmpty(uri)) {
+ return uri;
+ }
+ // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置
+ return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY);
+ }
+
+ void initMongo(String uri) {
+ log.info("[GridFsService] mongoDB uri: {}", uri);
+ if (StringUtils.isEmpty(uri)) {
+ log.warn("[GridFsService] uri is empty, GridFsService is off now!");
+ return;
+ }
+
+ ConnectionString connectionString = new ConnectionString(uri);
+ mongoClient = MongoClients.create(connectionString);
+
+ if (StringUtils.isEmpty(connectionString.getDatabase())) {
+ log.warn("[GridFsService] can't find database info from uri, will use [powerjob] as default, please make sure you have created the database 'powerjob'");
+ }
+
+ db = mongoClient.getDatabase(Optional.ofNullable(connectionString.getDatabase()).orElse("powerjob"));
+
+ log.info("[GridFsService] initialize MongoDB and GridFS successfully, will use mongodb GridFs as storage layer.");
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ mongoClient.close();
+ }
+
+ @Override
+ protected void init(ApplicationContext applicationContext) {
+ String uri = parseMongoUri(applicationContext.getEnvironment());
+ initMongo(uri);
+ }
+
+ public static class GridFsCondition extends PropertyAndOneBeanCondition {
+ @Override
+ protected List anyConfigKey() {
+ return Lists.newArrayList("spring.data.mongodb.uri", "oms.storage.dfs.mongodb.uri");
+ }
+
+ @Override
+ protected Class> beanType() {
+ return DFsService.class;
+ }
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java
new file mode 100644
index 00000000..ca8348a8
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java
@@ -0,0 +1,344 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.time.DateUtils;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.env.Environment;
+import tech.powerjob.common.serialize.JsonUtils;
+import tech.powerjob.common.utils.CommonUtils;
+import tech.powerjob.server.common.constants.SwitchableStatus;
+import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
+import tech.powerjob.server.extension.dfs.*;
+import tech.powerjob.server.persistence.storage.AbstractDFsService;
+
+import javax.annotation.Priority;
+import javax.sql.DataSource;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.sql.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * MySQL 特性类似的数据库存储
+ * PS1. 大文件上传可能会报 max_allowed_packet 不足,可根据参数放开数据库限制 set global max_allowed_packet = 500*1024*1024
+ * PS1. 官方基于 MySQL 测试,其他数据库使用前请自测,敬请谅解!
+ * PS2. 数据库并不适合大规模的文件存储,该扩展仅适用于简单业务,大型业务场景请选择其他存储方案(OSS、MongoDB等)
+ * ********************* 配置项 *********************
+ * oms.storage.dfs.mysql_series.driver
+ * oms.storage.dfs.mysql_series.url
+ * oms.storage.dfs.mysql_series.username
+ * oms.storage.dfs.mysql_series.password
+ * oms.storage.dfs.mysql_series.auto_create_table
+ * oms.storage.dfs.mysql_series.table_name
+ *
+ * @author tjq
+ * @since 2023/8/9
+ */
+@Slf4j
+@Priority(value = Integer.MAX_VALUE - 2)
+@Conditional(MySqlSeriesDfsService.MySqlSeriesCondition.class)
+public class MySqlSeriesDfsService extends AbstractDFsService {
+
+ private DataSource dataSource;
+
+ private static final String TYPE_MYSQL = "mysql_series";
+
+ /**
+ * 数据库驱动,MYSQL8 为 com.mysql.cj.jdbc.Driver
+ */
+ private static final String KEY_DRIVER_NAME = "driver";
+ /**
+ * 数据库地址,比如 jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
+ */
+ private static final String KEY_URL = "url";
+ /**
+ * 数据库账号,比如 root
+ */
+ private static final String KEY_USERNAME = "username";
+ /**
+ * 数据库密码
+ */
+ private static final String KEY_PASSWORD = "password";
+ /**
+ * 是否自动建表
+ */
+ private static final String KEY_AUTO_CREATE_TABLE = "auto_create_table";
+ /**
+ * 表名
+ */
+ private static final String KEY_TABLE_NAME = "table_name";
+
+ /* ********************* SQL region ********************* */
+
+ private static final String DEFAULT_TABLE_NAME = "powerjob_files";
+
+ private static final String CREATE_TABLE_SQL = "CREATE TABLE\n" +
+ "IF\n" +
+ "\tNOT EXISTS %s (\n" +
+ "\t\t`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT 'ID',\n" +
+ "\t\t`bucket` VARCHAR ( 255 ) NOT NULL COMMENT '分桶',\n" +
+ "\t\t`name` VARCHAR ( 255 ) NOT NULL COMMENT '文件名称',\n" +
+ "\t\t`version` VARCHAR ( 255 ) NOT NULL COMMENT '版本',\n" +
+ "\t\t`meta` VARCHAR ( 255 ) COMMENT '元数据',\n" +
+ "\t\t`length` BIGINT NOT NULL COMMENT '长度',\n" +
+ "\t\t`status` INT NOT NULL COMMENT '状态',\n" +
+ "\t\t`data` LONGBLOB NOT NULL COMMENT '文件内容',\n" +
+ "\t\t`extra` VARCHAR ( 255 ) COMMENT '其他信息',\n" +
+ "\t\t`gmt_create` DATETIME NOT NULL COMMENT '创建时间',\n" +
+ "\t\t`gmt_modified` DATETIME COMMENT '更新时间',\n" +
+ "\tPRIMARY KEY ( id ) \n" +
+ "\t);";
+
+ private static final String INSERT_SQL = "insert into %s(bucket, name, version, meta, length, status, data, extra, gmt_create, gmt_modified) values (?,?,?,?,?,?,?,?,?,?);";
+
+ private static final String DELETE_SQL = "DELETE FROM %s ";
+
+ private static final String QUERY_FULL_SQL = "select * from %s";
+
+ private static final String QUERY_META_SQL = "select bucket, name, version, meta, length, status, extra, gmt_create, gmt_modified from %s";
+
+
+ private void deleteByLocation(FileLocation fileLocation) {
+ String dSQLPrefix = fullSQL(DELETE_SQL);
+ String dSQL = dSQLPrefix.concat(whereSQL(fileLocation));
+ executeDelete(dSQL);
+ }
+
+ private void executeDelete(String sql) {
+ try (Connection con = dataSource.getConnection()) {
+ con.createStatement().executeUpdate(sql);
+ } catch (Exception e) {
+ log.error("[MySqlSeriesDfsService] executeDelete failed, sql: {}", sql);
+ }
+ }
+
+ @Override
+ public void store(StoreRequest storeRequest) throws IOException {
+
+ Stopwatch sw = Stopwatch.createStarted();
+ String insertSQL = fullSQL(INSERT_SQL);
+
+ FileLocation fileLocation = storeRequest.getFileLocation();
+
+ // 覆盖写,写之前先删除
+ deleteByLocation(fileLocation);
+
+ Map meta = Maps.newHashMap();
+ meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath());
+
+ Date date = new Date(System.currentTimeMillis());
+
+ try (Connection con = dataSource.getConnection()) {
+ PreparedStatement pst = con.prepareStatement(insertSQL);
+
+ pst.setString(1, fileLocation.getBucket());
+ pst.setString(2, fileLocation.getName());
+ pst.setString(3, "mu");
+ pst.setString(4, JsonUtils.toJSONString(meta));
+ pst.setLong(5, storeRequest.getLocalFile().length());
+ pst.setInt(6, SwitchableStatus.ENABLE.getV());
+ pst.setBlob(7, new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())));
+ pst.setString(8, null);
+ pst.setDate(9, date);
+ pst.setDate(10, date);
+
+ pst.execute();
+
+ log.info("[MySqlSeriesDfsService] store [{}] successfully, cost: {}", fileLocation, sw);
+
+ } catch (Exception e) {
+ log.error("[MySqlSeriesDfsService] store [{}] failed!", fileLocation);
+ ExceptionUtils.rethrow(e);
+ }
+ }
+
+ @Override
+ public void download(DownloadRequest downloadRequest) throws IOException {
+
+ Stopwatch sw = Stopwatch.createStarted();
+ String querySQL = fullSQL(QUERY_FULL_SQL);
+
+ FileLocation fileLocation = downloadRequest.getFileLocation();
+
+ FileUtils.forceMkdirParent(downloadRequest.getTarget());
+
+ try (Connection con = dataSource.getConnection()) {
+
+ ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation)));
+
+ boolean exist = resultSet.next();
+
+ if (!exist) {
+ log.warn("[MySqlSeriesDfsService] download file[{}] failed due to not exits!", fileLocation);
+ return;
+ }
+
+ Blob dataBlob = resultSet.getBlob("data");
+ FileUtils.copyInputStreamToFile(new BufferedInputStream(dataBlob.getBinaryStream()), downloadRequest.getTarget());
+
+ log.info("[MySqlSeriesDfsService] download [{}] successfully, cost: {}", fileLocation, sw);
+
+ } catch (Exception e) {
+ log.error("[MySqlSeriesDfsService] download file [{}] failed!", fileLocation, e);
+ ExceptionUtils.rethrow(e);
+ }
+
+ }
+
+ @Override
+ public Optional fetchFileMeta(FileLocation fileLocation) throws IOException {
+
+ String querySQL = fullSQL(QUERY_META_SQL);
+
+ try (Connection con = dataSource.getConnection()) {
+
+ ResultSet resultSet = con.createStatement().executeQuery(querySQL.concat(whereSQL(fileLocation)));
+
+ boolean exist = resultSet.next();
+
+ if (!exist) {
+ return Optional.empty();
+ }
+
+ FileMeta fileMeta = new FileMeta()
+ .setLength(resultSet.getLong("length"))
+ .setLastModifiedTime(resultSet.getDate("gmt_modified"))
+ .setMetaInfo(JsonUtils.parseMap(resultSet.getString("meta")));
+ return Optional.of(fileMeta);
+
+ } catch (Exception e) {
+ log.error("[MySqlSeriesDfsService] fetchFileMeta [{}] failed!", fileLocation);
+ ExceptionUtils.rethrow(e);
+ }
+
+ return Optional.empty();
+ }
+
+ @Override
+ public void cleanExpiredFiles(String bucket, int days) {
+
+ // 虽然官方提供了服务端删除的能力,依然强烈建议用户直接在数据库层面配置清理事件!!!
+
+ String dSQLPrefix = fullSQL(DELETE_SQL);
+ final long targetTs = DateUtils.addDays(new Date(System.currentTimeMillis()), -days).getTime();
+ final String targetDeleteTime = CommonUtils.formatTime(targetTs);
+ log.info("[MySqlSeriesDfsService] start to cleanExpiredFiles, targetDeleteTime: {}", targetDeleteTime);
+ String fSQL = dSQLPrefix.concat(String.format(" where gmt_modified < '%s'", targetDeleteTime));
+ log.info("[MySqlSeriesDfsService] cleanExpiredFiles SQL: {}", fSQL);
+ executeDelete(fSQL);
+ }
+
+ @Override
+ protected void init(ApplicationContext applicationContext) {
+
+ Environment env = applicationContext.getEnvironment();
+
+ MySQLProperty mySQLProperty = new MySQLProperty()
+ .setDriver(fetchProperty(env, TYPE_MYSQL, KEY_DRIVER_NAME))
+ .setUrl(fetchProperty(env, TYPE_MYSQL, KEY_URL))
+ .setUsername(fetchProperty(env, TYPE_MYSQL, KEY_USERNAME))
+ .setPassword(fetchProperty(env, TYPE_MYSQL, KEY_PASSWORD))
+ .setAutoCreateTable(Boolean.TRUE.toString().equalsIgnoreCase(fetchProperty(env, TYPE_MYSQL, KEY_AUTO_CREATE_TABLE)))
+ ;
+
+ try {
+ initDatabase(mySQLProperty);
+ initTable(mySQLProperty);
+ } catch (Exception e) {
+ log.error("[MySqlSeriesDfsService] init datasource failed!", e);
+ ExceptionUtils.rethrow(e);
+ }
+ }
+
+ void initDatabase(MySQLProperty property) {
+
+ log.info("[MySqlSeriesDfsService] init datasource by config: {}", property);
+
+ HikariConfig config = new HikariConfig();
+
+ config.setDriverClassName(property.driver);
+ config.setJdbcUrl(property.url);
+ config.setUsername(property.username);
+ config.setPassword(property.password);
+
+ config.setAutoCommit(true);
+ // 池中最小空闲连接数量
+ config.setMinimumIdle(2);
+ // 池中最大连接数量
+ config.setMaximumPoolSize(32);
+
+ dataSource = new HikariDataSource(config);
+ }
+
+ void initTable(MySQLProperty property) throws Exception {
+
+ if (property.autoCreateTable) {
+
+ String createTableSQL = fullSQL(CREATE_TABLE_SQL);
+
+ log.info("[MySqlSeriesDfsService] use create table SQL: {}", createTableSQL);
+ try (Connection connection = dataSource.getConnection()) {
+ connection.createStatement().execute(createTableSQL);
+ log.info("[MySqlSeriesDfsService] auto create table successfully!");
+ }
+ }
+ }
+
+ private String fullSQL(String sql) {
+ return String.format(sql, parseTableName());
+ }
+
+ private String parseTableName() {
+ // 误删,兼容本地 unit test
+ if (applicationContext == null) {
+ return DEFAULT_TABLE_NAME;
+ }
+ String tableName = fetchProperty(applicationContext.getEnvironment(), TYPE_MYSQL, KEY_TABLE_NAME);
+ return StringUtils.isEmpty(tableName) ? DEFAULT_TABLE_NAME : tableName;
+ }
+
+ private static String whereSQL(FileLocation fileLocation) {
+ return String.format(" where bucket='%s' AND name='%s' ", fileLocation.getBucket(), fileLocation.getName());
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ }
+
+ @Data
+ @Accessors(chain = true)
+ static class MySQLProperty {
+ private String driver;
+ private String url;
+ private String username;
+ private String password;
+
+ private boolean autoCreateTable;
+ }
+
+ public static class MySqlSeriesCondition extends PropertyAndOneBeanCondition {
+ @Override
+ protected List anyConfigKey() {
+ return Lists.newArrayList("oms.storage.dfs.mysql_series.url");
+ }
+
+ @Override
+ protected Class> beanType() {
+ return DFsService.class;
+ }
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java
new file mode 100644
index 00000000..25974864
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AbstractDfsServiceTest.java
@@ -0,0 +1,88 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+import tech.powerjob.common.serialize.JsonUtils;
+import tech.powerjob.server.common.utils.OmsFileUtils;
+import tech.powerjob.server.extension.dfs.*;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * AbstractDfsServiceTest
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+@Slf4j
+public abstract class AbstractDfsServiceTest {
+
+ private static final String BUCKET = "pj_test";
+
+ abstract protected Optional fetchService();
+
+ @Test
+ void testBaseFileOperation() throws Exception {
+
+ Optional aliOssServiceOpt = fetchService();
+ if (!aliOssServiceOpt.isPresent()) {
+ return;
+ }
+
+ DFsService aliOssService = aliOssServiceOpt.get();
+
+ String content = "wlcgyqsl".concat(String.valueOf(ThreadLocalRandom.current().nextLong()));
+
+ String temporarySourcePath = OmsFileUtils.genTemporaryWorkPath() + "source.txt";
+ String temporaryDownloadPath = OmsFileUtils.genTemporaryWorkPath() + "download.txt";
+
+ log.info("[testBaseFileOperation] temporarySourcePath: {}", temporarySourcePath);
+ File sourceFile = new File(temporarySourcePath);
+ FileUtils.forceMkdirParent(sourceFile);
+ OmsFileUtils.string2File(content, sourceFile);
+
+ FileLocation fileLocation = new FileLocation().setBucket(BUCKET).setName(String.format("test_%d.txt", ThreadLocalRandom.current().nextLong()));
+
+ StoreRequest storeRequest = new StoreRequest()
+ .setFileLocation(fileLocation)
+ .setLocalFile(sourceFile);
+
+ // 存储
+ aliOssService.store(storeRequest);
+
+ // 读取 meta
+ Optional metaOpt = aliOssService.fetchFileMeta(fileLocation);
+ assert metaOpt.isPresent();
+
+ log.info("[testBaseFileOperation] file meta: {}", JsonUtils.toJSONString(metaOpt.get()));
+
+ // 下载
+ log.info("[testBaseFileOperation] temporaryDownloadPath: {}", temporaryDownloadPath);
+ File downloadFile = new File(temporaryDownloadPath);
+ DownloadRequest downloadRequest = new DownloadRequest()
+ .setFileLocation(fileLocation)
+ .setTarget(downloadFile);
+ aliOssService.download(downloadRequest);
+
+ String downloadFileContent = FileUtils.readFileToString(downloadFile, StandardCharsets.UTF_8);
+ log.info("[testBaseFileOperation] download content: {}", downloadFileContent);
+ assert downloadFileContent.equals(content);
+
+ // 定时清理,只是执行,不校验
+ aliOssService.cleanExpiredFiles(BUCKET, 3);
+ }
+
+ @Test
+ void testFileNotExist() throws Exception {
+ Optional aliOssServiceOpt = fetchService();
+ if (!aliOssServiceOpt.isPresent()) {
+ return;
+ }
+ Optional metaOpt = aliOssServiceOpt.get().fetchFileMeta(new FileLocation().setBucket("tjq").setName("yhz"));
+ assert !metaOpt.isPresent();
+ }
+}
diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AliOssServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AliOssServiceTest.java
new file mode 100644
index 00000000..94167731
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/AliOssServiceTest.java
@@ -0,0 +1,49 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import com.aliyun.oss.common.utils.AuthUtils;
+import com.aliyun.oss.common.utils.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import tech.powerjob.server.extension.dfs.DFsService;
+
+import java.util.Optional;
+
+
+/**
+ * test AliOSS
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+@Slf4j
+class AliOssServiceTest extends AbstractDfsServiceTest {
+
+ private static final String BUCKET = "power-job";
+
+ /**
+ * 依赖阿里云账号密码测试,为了保证单测在其他环境也能通过,如果发现不存在配置则直接跳过
+ * @return AliOssService
+ */
+ @Override
+ protected Optional fetchService() {
+ String accessKeyId = StringUtils.trim(System.getenv(AuthUtils.ACCESS_KEY_ENV_VAR));
+ String secretAccessKey = StringUtils.trim(System.getenv(AuthUtils.SECRET_KEY_ENV_VAR));
+
+ String bucket = Optional.ofNullable(System.getenv("POWERJOB_OSS_BUEKCT")).orElse(BUCKET);
+
+ log.info("[AliOssServiceTest] ak: {}, sk: {}", accessKeyId, secretAccessKey);
+
+ if (org.apache.commons.lang3.StringUtils.isAnyEmpty(accessKeyId, secretAccessKey)) {
+ return Optional.empty();
+ }
+
+ try {
+ AliOssService aliOssService = new AliOssService();
+ aliOssService.initOssClient("oss-cn-beijing.aliyuncs.com", bucket, AliOssService.CredentialType.ENV.getCode(), null, null, null);
+ return Optional.of(aliOssService);
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
+ return Optional.empty();
+ }
+}
\ No newline at end of file
diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/GridFsServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/GridFsServiceTest.java
new file mode 100644
index 00000000..ba0d7c2e
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/GridFsServiceTest.java
@@ -0,0 +1,32 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import tech.powerjob.server.common.utils.TestUtils;
+import tech.powerjob.server.extension.dfs.DFsService;
+
+import java.util.Optional;
+
+/**
+ * test GridFS
+ *
+ * @author tjq
+ * @since 2023/7/30
+ */
+@Slf4j
+class GridFsServiceTest extends AbstractDfsServiceTest {
+
+ @Override
+ protected Optional fetchService() {
+
+ Object mongoUri = TestUtils.fetchTestConfig().get(TestUtils.KEY_MONGO_URI);
+
+ if (mongoUri == null) {
+ log.info("[GridFsServiceTest] mongoUri is null, skip load!");
+ return Optional.empty();
+ }
+
+ GridFsService gridFsService = new GridFsService();
+ gridFsService.initMongo(String.valueOf(mongoUri));
+ return Optional.of(gridFsService);
+ }
+}
\ No newline at end of file
diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsServiceTest.java
new file mode 100644
index 00000000..d14116fe
--- /dev/null
+++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsServiceTest.java
@@ -0,0 +1,42 @@
+package tech.powerjob.server.persistence.storage.impl;
+
+import tech.powerjob.common.utils.NetUtils;
+import tech.powerjob.server.extension.dfs.DFsService;
+
+import java.util.Optional;
+
+/**
+ * MySqlSeriesDfsServiceTest
+ *
+ * @author tjq
+ * @since 2023/8/10
+ */
+class MySqlSeriesDfsServiceTest extends AbstractDfsServiceTest {
+
+ @Override
+ protected Optional fetchService() {
+
+ boolean dbAvailable = NetUtils.checkIpPortAvailable("127.0.0.1", 3306);
+ if (dbAvailable) {
+ MySqlSeriesDfsService mySqlSeriesDfsService = new MySqlSeriesDfsService();
+
+ try {
+
+ MySqlSeriesDfsService.MySQLProperty mySQLProperty = new MySqlSeriesDfsService.MySQLProperty()
+ .setDriver("com.mysql.cj.jdbc.Driver")
+ .setUrl("jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
+ .setUsername("root")
+ .setAutoCreateTable(true)
+ .setPassword("No1Bug2Please3!");
+ mySqlSeriesDfsService.initDatabase(mySQLProperty);
+ mySqlSeriesDfsService.initTable(mySQLProperty);
+
+ return Optional.of(mySqlSeriesDfsService);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ return Optional.empty();
+ }
+}
\ No newline at end of file
diff --git a/powerjob-server/powerjob-server-remote/pom.xml b/powerjob-server/powerjob-server-remote/pom.xml
index e5040c98..6a43bc30 100644
--- a/powerjob-server/powerjob-server-remote/pom.xml
+++ b/powerjob-server/powerjob-server-remote/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 4.3.3
+ 4.3.4
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java
index b2a86791..e6febdfe 100644
--- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java
@@ -60,9 +60,11 @@ public class ServerElectionService {
final String currentServer = request.getCurrentServer();
// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
Optional localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
- if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
- log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
- return currentServer;
+ if (localProtocolInfoOpt.isPresent()) {
+ if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
+ log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());
+ return currentServer;
+ }
}
}
return getServer0(request);
@@ -110,13 +112,13 @@ public class ServerElectionService {
// 篡位,如果本机存在协议,则作为Server调度该 worker
final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
if (targetProtocolInfo != null) {
- // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
+ // 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址
appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
- return targetProtocolInfo.getAddress();
+ return targetProtocolInfo.getExternalAddress();
}
}catch (Exception e) {
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
@@ -129,10 +131,10 @@ public class ServerElectionService {
/**
* 判断指定server是否存活
- * @param serverAddress 需要检测的server地址
+ * @param serverAddress 需要检测的server地址(绑定的内网地址)
* @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...)
* @param protocol 协议,用于返回指定的地址
- * @return null or address
+ * @return null or address(外部地址)
*/
private String activeAddress(String serverAddress, Set downServerCache, String protocol) {
@@ -156,9 +158,10 @@ public class ServerElectionService {
final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);
if (protocolInfo != null) {
downServerCache.remove(serverAddress);
- final String protocolAddress = protocolInfo.toJavaObject(ProtocolInfo.class).getAddress();
- log.info("[ServerElection] server[{}] is active, it will be the master, final protocol address={}", serverAddress, protocolAddress);
- return protocolAddress;
+ ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);
+ log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);
+ // 4.3.3 升级 4.3.4 过程中,未升级的 server 还不存在 externalAddress,需要使用 address 兼容
+ return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress());
} else {
log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);
}
diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java
index 0802e811..1832a7e0 100644
--- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/ProtocolInfo.java
@@ -3,6 +3,9 @@ package tech.powerjob.server.remote.transporter;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import tech.powerjob.common.PowerJobDKey;
+import tech.powerjob.common.utils.PropertyUtils;
+import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.transporter.Transporter;
/**
@@ -20,11 +23,32 @@ public class ProtocolInfo {
private String address;
+ /**
+ * 外部地址,当存在 NAT 等场景时,需要下发该地址到 worker
+ */
+ private String externalAddress;
+
private transient Transporter transporter;
- public ProtocolInfo(String protocol, String address, Transporter transporter) {
+ /**
+ * 序列化需要,必须存在无参构造方法!严禁删除
+ */
+ public ProtocolInfo() {
+ }
+
+ public ProtocolInfo(String protocol, String host, int port, Transporter transporter) {
this.protocol = protocol;
- this.address = address;
this.transporter = transporter;
+
+ this.address = Address.toFullAddress(host, port);
+
+ // 处理外部地址
+ String externalAddress = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, host);
+
+ // 考虑到不同协议 port 理论上不一样,server 需要为每个单独的端口配置映射,规则为 powerjob.network.external.port.${协议},比如 powerjob.network.external.port.http
+ String externalPortByProtocolKey = PowerJobDKey.NT_EXTERNAL_PORT.concat(".").concat(protocol.toLowerCase());
+ // 大部分用户只使用一种协议,在此处做兼容处理降低答疑量和提高易用性(如果用户有多种协议,只有被转发的协议能成功通讯)
+ String externalPort = PropertyUtils.readProperty(externalPortByProtocolKey, PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(port)));
+ this.externalAddress = Address.toFullAddress(externalAddress, Integer.parseInt(externalPort));
}
}
diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java
index d5d2f624..331a83ad 100644
--- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java
@@ -120,7 +120,7 @@ public class PowerTransportService implements TransportService, InitializingBean
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);
this.engines.add(re);
- this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
+ this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.getHost(), address.getPort(), engineOutput.getTransporter()));
}
@Override
diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
index 2cbef095..62e4d320 100644
--- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
@@ -2,12 +2,11 @@ package tech.powerjob.server.remote.worker;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.server.common.module.WorkerInfo;
-import tech.powerjob.server.extension.WorkerFilter;
+import tech.powerjob.server.remote.worker.filter.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/DesignatedWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java
similarity index 90%
rename from powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/DesignatedWorkerFilter.java
rename to powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java
index 0cb98396..c03a9f9f 100644
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/DesignatedWorkerFilter.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java
@@ -1,4 +1,4 @@
-package tech.powerjob.server.extension.defaultimpl.workerfilter;
+package tech.powerjob.server.remote.worker.filter;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
@@ -6,7 +6,6 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.module.WorkerInfo;
-import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.Set;
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java
similarity index 86%
rename from powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java
rename to powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java
index de87a910..5cdfb9a8 100644
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java
@@ -1,6 +1,5 @@
-package tech.powerjob.server.extension.defaultimpl.workerfilter;
+package tech.powerjob.server.remote.worker.filter;
-import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j;
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/SystemMetricsWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java
similarity index 88%
rename from powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/SystemMetricsWorkerFilter.java
rename to powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java
index 0402604d..2270189a 100644
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/workerfilter/SystemMetricsWorkerFilter.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java
@@ -1,7 +1,6 @@
-package tech.powerjob.server.extension.defaultimpl.workerfilter;
+package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.common.model.SystemMetrics;
-import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j;
diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/WorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java
similarity index 91%
rename from powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/WorkerFilter.java
rename to powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java
index 5df4214d..35e41586 100644
--- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/WorkerFilter.java
+++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java
@@ -1,4 +1,4 @@
-package tech.powerjob.server.extension;
+package tech.powerjob.server.remote.worker.filter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml
index d1a954dc..d7b40670 100644
--- a/powerjob-server/powerjob-server-starter/pom.xml
+++ b/powerjob-server/powerjob-server-starter/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 4.3.3
+ 4.3.4
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/TestController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/TestController.java
new file mode 100644
index 00000000..c7fea8be
--- /dev/null
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/TestController.java
@@ -0,0 +1,75 @@
+package tech.powerjob.server.web.controller;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import tech.powerjob.common.serialize.JsonUtils;
+import tech.powerjob.common.utils.CollectionUtils;
+import tech.powerjob.server.common.utils.TestUtils;
+import tech.powerjob.server.core.alarm.AlarmCenter;
+import tech.powerjob.server.core.alarm.module.JobInstanceAlarm;
+import tech.powerjob.server.extension.alarm.AlarmTarget;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+/**
+ * 开发团队专用(或者 PRO 用户用来做自检也可以 lol)
+ * 测试某些强依赖运行时环境的组件,如 Mail 告警等
+ *
+ * @author tjq
+ * @since 2023/7/31
+ */
+@Slf4j
+@RestController
+@RequestMapping("/test")
+public class TestController {
+
+ @Value("${server.port}")
+ private int port;
+
+ @Resource
+ private AlarmCenter alarmCenter;
+
+ @RequestMapping("/io")
+ public Map io(@RequestBody Map input) {
+ log.info("[TestController] input: {}", JsonUtils.toJSONString(input));
+ return input;
+ }
+
+ @GetMapping("/check")
+ public void check() {
+ Map testConfig = TestUtils.fetchTestConfig();
+ if (CollectionUtils.isEmpty(testConfig)) {
+ log.info("[TestController] testConfig not exist, skip check!");
+ return;
+ }
+
+ log.info("[TestController] testConfig: {}", JsonUtils.toJSONString(testConfig));
+
+ testAlarmCenter();
+ }
+
+ void testAlarmCenter() {
+ JobInstanceAlarm jobInstanceAlarm = new JobInstanceAlarm().setAppId(277).setJobId(1).setInstanceId(2)
+ .setJobName("test-alarm").setJobParams("jobParams").setInstanceParams("instanceParams")
+ .setExecuteType(1).setFinishedTime(System.currentTimeMillis());
+
+ AlarmTarget target = new AlarmTarget().setName("ald").setPhone("208140").setExtra("extra")
+ .setPhone(MapUtils.getString(TestUtils.fetchTestConfig(), TestUtils.KEY_PHONE_NUMBER))
+ .setEmail("tjq@zju.edu.cn")
+ .setWebHook(localUrlPath().concat("/test/io"));
+
+ log.info("[TestController] start to testAlarmCenter, target: {}", target);
+ alarmCenter.alarmFailed(jobInstanceAlarm, Lists.newArrayList(target));
+ }
+
+ private String localUrlPath() {
+ return String.format("http://127.0.0.1:%d", port);
+ }
+}
diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties b/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties
index 693186e6..e61cd400 100644
--- a/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties
+++ b/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties
@@ -11,8 +11,7 @@ spring.datasource.core.minimum-idle=5
####### MongoDB properties(Non-core configuration properties) #######
####### delete mongodb config to disable mongodb #######
-#oms.mongodb.enable=true
-#spring.data.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
+#oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
####### Email properties(Non-core configuration properties) #######
####### Delete the following code to disable the mail #######
diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/application-pre.properties b/powerjob-server/powerjob-server-starter/src/main/resources/application-pre.properties
index f08f673b..a19491e8 100644
--- a/powerjob-server/powerjob-server-starter/src/main/resources/application-pre.properties
+++ b/powerjob-server/powerjob-server-starter/src/main/resources/application-pre.properties
@@ -11,8 +11,7 @@ spring.datasource.core.minimum-idle=5
####### MongoDB properties(Non-core configuration properties) #######
####### delete mongodb config to disable mongodb #######
-oms.mongodb.enable=true
-spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre
+oms.storage.dfs.mongodb.uri=mongodb://remotehost:27017/powerjob-pre
####### Email properties(Non-core configuration properties) #######
####### Delete the following code to disable the mail #######
diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/application-product.properties b/powerjob-server/powerjob-server-starter/src/main/resources/application-product.properties
index a3ed4fe1..53e47d17 100644
--- a/powerjob-server/powerjob-server-starter/src/main/resources/application-product.properties
+++ b/powerjob-server/powerjob-server-starter/src/main/resources/application-product.properties
@@ -11,8 +11,7 @@ spring.datasource.core.minimum-idle=5
####### MongoDB properties(Non-core configuration properties) #######
####### delete mongodb config to disable mongodb #######
-oms.mongodb.enable=true
-spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product
+oms.storage.dfs.mongodb.uri=mongodb://localhost:27017/powerjob-product
####### Email properties(Non-core configuration properties) #######
####### Delete the following code to disable the mail #######
diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DingTalkTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DingTalkTest.java
index 9195ff1c..2b7883a2 100644
--- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DingTalkTest.java
+++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/DingTalkTest.java
@@ -1,12 +1,9 @@
package tech.powerjob.server.test;
import org.junit.jupiter.api.Disabled;
-import tech.powerjob.server.extension.defaultimpl.alarm.impl.DingTalkUtils;
-import com.google.common.collect.Lists;
+import tech.powerjob.server.core.alarm.impl.DingTalkUtils;
import org.junit.jupiter.api.Test;
-import java.util.List;
-
/**
* 测试钉钉消息工具
*
diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/GridFsTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/GridFsTest.java
deleted file mode 100644
index 729dc2f2..00000000
--- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/GridFsTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package tech.powerjob.server.test;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import tech.powerjob.server.persistence.mongodb.GridFsManager;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import javax.annotation.Resource;
-import java.io.File;
-import java.io.IOException;
-
-/**
- * GridFS 测试
- *
- * @author tjq
- * @since 2020/5/18
- */
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
-public class GridFsTest {
-
- @Resource
- private GridFsManager gridFsManager;
-
- @Test
- @Disabled
- public void testStore() throws IOException {
- /**
- File file = new File("/Users/tjq/Desktop/DistributeCompute/oms-template-origin.zip");
- gridFsManager.store(file, "test", "test.zip");
- **/
- }
-
- @Test
- @Disabled
- public void testDownload() throws IOException {
- /**
- File file = new File("/Users/tjq/Desktop/tmp/test-download.zip");
- gridFsManager.download(file, "test", "test.zip");
- **/
- }
-
- @Test
- @Disabled
- public void testDelete() {
- /**
- gridFsManager.deleteBefore("fs", 0);
- **/
- }
-
- @Test
- @Disabled
- public void testExists() {
- /**
- System.out.println(gridFsManager.exists("test", "test.zip"));
- System.out.println(gridFsManager.exists("test", "oms-sql.sql"));
- **/
- }
-
-}
diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/OmsLogTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/OmsLogTest.java
deleted file mode 100644
index 2fa7a093..00000000
--- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/OmsLogTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package tech.powerjob.server.test;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import tech.powerjob.server.common.utils.OmsFileUtils;
-import tech.powerjob.server.persistence.mongodb.GridFsManager;
-import tech.powerjob.server.core.scheduler.CleanService;
-import com.mongodb.client.gridfs.model.GridFSFile;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.data.mongodb.core.query.Criteria;
-import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.mongodb.gridfs.GridFsTemplate;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import javax.annotation.Resource;
-import java.util.Date;
-import java.util.function.Consumer;
-
-/**
- * 在线日志测试
- *
- * @author tjq
- * @since 2020/5/11
- */
-
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
-@Disabled
-public class OmsLogTest {
-
- @Resource
- private CleanService cleanService;
- @Resource
- private GridFsTemplate gridFsTemplate;
-
- @Test
- public void testLocalLogCleaner() {
- cleanService.cleanLocal(OmsFileUtils.genLogDirPath(), 0);
- }
-
- @Test
- public void testRemoteLogCleaner() {
- cleanService.cleanRemote(GridFsManager.LOG_BUCKET, 0);
- }
-
- @Test
- public void testGridFsQuery() {
- Query mongoQuery = Query.query(Criteria.where("uploadDate").gt(new Date()));
- gridFsTemplate.find(mongoQuery).forEach(new Consumer() {
- @Override
- public void accept(GridFSFile gridFSFile) {
- System.out.println(gridFSFile.getFilename());
- }
- });
- }
-}
diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml
index d125c0bb..e8aba22e 100644
--- a/powerjob-worker-agent/pom.xml
+++ b/powerjob-worker-agent/pom.xml
@@ -5,24 +5,24 @@
powerjob
tech.powerjob
- 4.3.3
+ 4.3.4
4.0.0
powerjob-worker-agent
- 4.3.3
+ 4.3.4
jar
- 4.3.3
+ 4.3.4
1.2.9
4.3.2
5.3.23
2.3.4.RELEASE
- 4.3.3
+ 4.3.4
8.0.28
diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml
index cc616a8f..b3decf98 100644
--- a/powerjob-worker-samples/pom.xml
+++ b/powerjob-worker-samples/pom.xml
@@ -5,18 +5,18 @@
powerjob
tech.powerjob
- 4.3.3
+ 4.3.4
4.0.0
powerjob-worker-samples
- 4.3.3
+ 4.3.4
2.7.4
- 4.3.3
+ 4.3.4
1.2.83
- 4.3.3
+ 4.3.4
true
diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java
index 215a95a0..2b77c370 100644
--- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java
+++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java
@@ -17,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -40,7 +41,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
log.info("taskContext:{}", JsonUtils.toJSONString(context));
// 根据控制台参数获取MR批次及子任务大小
- final JSONObject jobParams = JSONObject.parseObject(context.getJobParams());
+ final JSONObject jobParams = Optional.ofNullable(context.getJobParams()).map(JSONObject::parseObject).orElse(new JSONObject());
Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml
index 50c6314c..73f81b69 100644
--- a/powerjob-worker-spring-boot-starter/pom.xml
+++ b/powerjob-worker-spring-boot-starter/pom.xml
@@ -5,16 +5,16 @@
powerjob
tech.powerjob
- 4.3.3
+ 4.3.4
4.0.0
powerjob-worker-spring-boot-starter
- 4.3.3
+ 4.3.4
jar
- 4.3.3
+ 4.3.4
2.7.4
diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index 312ac0b7..3297a182 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -5,12 +5,12 @@
powerjob
tech.powerjob
- 4.3.3
+ 4.3.4
4.0.0
powerjob-worker
- 4.3.3
+ 4.3.4
jar
@@ -21,10 +21,10 @@
1.2.9
- 4.3.3
- 4.3.3
- 4.3.3
- 4.3.3
+ 4.3.4
+ 4.3.4
+ 4.3.4
+ 4.3.4
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java
index 61ea7271..bbe682d2 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java
@@ -3,12 +3,14 @@ package tech.powerjob.worker;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
+import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
+import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.engine.EngineConfig;
@@ -79,9 +81,13 @@ public class PowerJobWorker {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
- // 初始化元数据
- String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
- workerRuntime.setWorkerAddress(workerAddress);
+ // 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址)
+ String localBindIp = NetUtils.getLocalHost();
+ int localBindPort = config.getPort();
+ String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);
+ String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));
+ log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);
+ workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort)));
// 初始化 线程池
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
@@ -100,7 +106,7 @@ public class PowerJobWorker {
EngineConfig engineConfig = new EngineConfig()
.setType(config.getProtocol().name())
.setServerType(ServerType.WORKER)
- .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
+ .setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
EngineOutput engineOutput = remoteEngine.start(engineConfig);
@@ -115,7 +121,7 @@ public class PowerJobWorker {
log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
// 初始化日志系统
- OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
+ OmsLogHandler omsLogHandler = new OmsLogHandler(workerRuntime.getWorkerAddress(), workerRuntime.getTransporter(), serverDiscoveryService);
workerRuntime.setOmsLogHandler(omsLogHandler);
// 初始化存储
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java
index a1edebc4..58a6bc3a 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java
@@ -73,6 +73,7 @@ public abstract class TaskTracker {
instanceInfo.setThreadConcurrency(req.getThreadConcurrency());
instanceInfo.setTaskRetryNum(req.getTaskRetryNum());
instanceInfo.setLogConfig(req.getLogConfig());
+ instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS());
// 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) {