Merge branch '4.3.6'

This commit is contained in:
tjq 2023-09-03 14:05:03 +08:00
commit 9b7c237cf0
39 changed files with 540 additions and 156 deletions

View File

@ -6,7 +6,7 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>pom</packaging>
<name>powerjob</name>
<url>http://www.powerjob.tech</url>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.9.1</junit.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.common.version>4.3.5</powerjob.common.version>
<powerjob.common.version>4.3.6</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>jar</packaging>
<properties>

View File

@ -0,0 +1,24 @@
package tech.powerjob.common.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* WorkerAppInfo
*
* @author tjq
* @since 2023/9/2
*/
@Data
@NoArgsConstructor
@Accessors(chain = true)
public class WorkerAppInfo implements Serializable {
/**
* 应用唯一 ID
*/
private Long appId;
}

View File

@ -0,0 +1,10 @@
package tech.powerjob.common.response;
/**
* 主要目的消除 idea 烦人的类型提示
*
* @author tjq
* @since 2023/9/2
*/
public class ObjectResultDTO extends ResultDTO<Object> {
}

View File

@ -170,8 +170,8 @@ public class NetUtils {
log.warn("[Net] findNetworkInterface failed", e);
}
// sort by interface index, the smaller is preferred. 部分用户反馈 IP 获取逻辑反而劣化了先注释
// validNetworkInterfaces.sort(Comparator.comparingInt(NetworkInterface::getIndex));
// sort by interface index, the smaller is preferred.
validNetworkInterfaces.sort(Comparator.comparingInt(NetworkInterface::getIndex));
// Try to find the preferred one
for (NetworkInterface networkInterface : validNetworkInterfaces) {

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-official-processors</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>jar</packaging>
<properties>
@ -20,9 +20,9 @@
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<powerjob.worker.version>4.3.5</powerjob.worker.version>
<powerjob.worker.version>4.3.6</powerjob.worker.version>
<spring.jdbc.version>5.2.9.RELEASE</spring.jdbc.version>
<h2.db.version>2.1.214</h2.db.version>
<h2.db.version>2.2.220</h2.db.version>
<mysql.version>8.0.28</mysql.version>
<spring.version>5.3.23</spring.version>

View File

@ -19,6 +19,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
/**
* 脚本处理器
@ -78,17 +79,23 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
String result;
final Charset charset = getCharset();
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
try {
InputStream is = process.getInputStream();
InputStream es = process.getErrorStream();
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset));
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset));
ForkJoinTask<?> inputSubmit = POOL.submit(() -> copyStream(is, inputBuilder, omsLogger, charset));
ForkJoinTask<?> errorSubmit = POOL.submit(() -> copyStream(es, errorBuilder, omsLogger, charset));
success = process.waitFor() == 0;
// 阻塞等待日志读取
inputSubmit.get();
errorSubmit.get();
} catch (InterruptedException ie) {
omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted");
} finally {
result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString());
result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder, errorBuilder);
}
return new ProcessResult(success, result);
}
@ -132,11 +139,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
return scriptPath;
}
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
while ((line = br.readLine()) != null) {
sb.append(line);
sb.append(line).append(System.lineSeparator());
// 同步到在线日志
omsLogger.info(line);
}
@ -145,6 +152,13 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
omsLogger.warn("[SYSTEM] copyStream failed.", e);
sb.append("Exception: ").append(e);
} finally {
try {
is.close();
} catch (IOException e) {
log.warn("[ScriptProcessor] close stream failed.", e);
omsLogger.warn("[SYSTEM] close stream failed.", e);
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -20,9 +20,9 @@
<maven-jar-plugin.version>3.2.2</maven-jar-plugin.version>
<logback.version>1.2.9</logback.version>
<springboot.version>2.7.4</springboot.version>
<powerjob-remote-impl-http.version>4.3.5</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.5</powerjob-remote-impl-akka.version>
<springboot.version>2.7.14</springboot.version>
<powerjob-remote-impl-http.version>4.3.6</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.6</powerjob-remote-impl-akka.version>
<gatling.version>3.9.0</gatling.version>
<gatling-maven-plugin.version>4.2.9</gatling-maven-plugin.version>

View File

@ -5,11 +5,11 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>4.3.5</version>
<version>4.3.6</version>
<artifactId>powerjob-remote-framework</artifactId>
<properties>
@ -17,7 +17,7 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-common.version>4.3.5</powerjob-common.version>
<powerjob-common.version>4.3.6</powerjob-common.version>
<reflections.version>0.10.2</reflections.version>

View File

@ -5,19 +5,19 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-remote-framework.version>4.3.5</powerjob-remote-framework.version>
<powerjob-remote-framework.version>4.3.6</powerjob-remote-framework.version>
<akka.version>2.6.13</akka.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-http</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -18,7 +18,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<vertx.version>4.3.7</vertx.version>
<powerjob-remote-framework.version>4.3.5</powerjob-remote-framework.version>
<powerjob-remote-framework.version>4.3.6</powerjob-remote-framework.version>
</properties>
<dependencies>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>pom</packaging>
<modules>
@ -26,21 +26,21 @@
<properties>
<springboot.version>2.7.4</springboot.version>
<springboot.version>2.7.14</springboot.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.30</mysql.version>
<mysql.version>8.0.33</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
<mssql-jdbc.version>7.4.1.jre8</mssql-jdbc.version>
<db2-jdbc.version>11.5.0.0</db2-jdbc.version>
<postgresql.version>42.2.14</postgresql.version>
<h2.db.version>2.1.214</h2.db.version>
<postgresql.version>42.6.0</postgresql.version>
<h2.db.version>2.2.220</h2.db.version>
<mongodb-driver-sync.version>4.10.2</mongodb-driver-sync.version>
<zip4j.version>2.11.2</zip4j.version>
<jgit.version>5.7.0.202003110725-r</jgit.version>
<jgit.version>5.13.2.202306221912-r</jgit.version>
<mvn.invoker.version>3.0.1</mvn.invoker.version>
<commons.net.version>3.8.0</commons.net.version>
<commons.net.version>3.9.0</commons.net.version>
<fastjson.version>1.2.83</fastjson.version>
<dingding.version>1.0.1</dingding.version>
@ -48,13 +48,14 @@
<maven.deploy.skip>true</maven.deploy.skip>
<groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.1.6</cron-utils.version>
<cron-utils.version>9.2.1</cron-utils.version>
<powerjob-common.version>4.3.5</powerjob-common.version>
<powerjob-remote-impl-http.version>4.3.5</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.5</powerjob-remote-impl-akka.version>
<powerjob-common.version>4.3.6</powerjob-common.version>
<powerjob-remote-impl-http.version>4.3.6</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.3.6</powerjob-remote-impl-akka.version>
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
<minio.version>8.5.2</minio.version>
<commons-collections4.version>4.4</commons-collections4.version>
</properties>
@ -113,7 +114,12 @@
<artifactId>aliyun-sdk-oss</artifactId>
<version>${aliyun-sdk-oss.version}</version>
</dependency>
<!-- 存储扩展-Minio未使用可移除 -->
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -40,6 +40,10 @@
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -4,10 +4,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import tech.powerjob.server.extension.dfs.DFsService;
import tech.powerjob.server.persistence.storage.impl.AliOssService;
import tech.powerjob.server.persistence.storage.impl.EmptyDFsService;
import tech.powerjob.server.persistence.storage.impl.GridFsService;
import tech.powerjob.server.persistence.storage.impl.MySqlSeriesDfsService;
import tech.powerjob.server.persistence.storage.impl.*;
/**
* 初始化内置的存储服务
@ -36,6 +33,12 @@ public class StorageConfiguration {
return new AliOssService();
}
@Bean
@Conditional(MinioOssService.MinioOssCondition.class)
public DFsService initMinioOssFs() {
return new MinioOssService();
}
@Bean
@Conditional(EmptyDFsService.EmptyCondition.class)
public DFsService initEmptyDfs() {

View File

@ -0,0 +1,235 @@
package tech.powerjob.server.persistence.storage.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.minio.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.Environment;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import javax.annotation.Priority;
import java.nio.file.Files;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* MINIO support
* <a href="https://min.io/">High Performance Object Storage</a>
* 配置项
* oms.storage.dfs.minio.endpoint
* oms.storage.dfs.minio.bucketName
* oms.storage.dfs.minio.accessKey
* oms.storage.dfs.minio.secretKey
*
* @author xinyi
* @since 2023/8/21
*/
@Slf4j
@Priority(value = Integer.MAX_VALUE - 3)
@Conditional(MinioOssService.MinioOssCondition.class)
public class MinioOssService extends AbstractDFsService {
private static final String TYPE_MINIO = "minio";
private static final String KEY_ENDPOINT = "endpoint";
private static final String KEY_BUCKET_NAME = "bucketName";
private static final String ACCESS_KEY = "accessKey";
private static final String SECRET_KEY = "secretKey";
private MinioClient minioClient;
private String bucket;
private static final String NO_SUCH_KEY = "NoSuchKey";
@Override
public void store(StoreRequest storeRequest) {
try {
minioClient.uploadObject(UploadObjectArgs.builder()
.bucket(this.bucket)
.object(parseFileName(storeRequest.getFileLocation()))
.filename(storeRequest.getLocalFile().getPath())
.contentType(Files.probeContentType(storeRequest.getLocalFile().toPath()))
.build());
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
@Override
public void download(DownloadRequest downloadRequest) {
try {
FileUtils.forceMkdirParent(downloadRequest.getTarget());
// 下载文件
minioClient.downloadObject(
DownloadObjectArgs.builder()
.bucket(this.bucket)
.object(parseFileName(downloadRequest.getFileLocation()))
.filename(downloadRequest.getTarget().getAbsolutePath())
.build());
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
/**
* 获取文件元
*
* @param fileLocation 文件位置
*/
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) {
try {
StatObjectResponse stat = minioClient.statObject(StatObjectArgs.builder()
.bucket(this.bucket)
.object(parseFileName(fileLocation))
.build());
return Optional.ofNullable(stat).map(minioStat -> {
Map<String, Object> metaInfo = Maps.newHashMap();
if (stat.userMetadata() != null) {
metaInfo.putAll(stat.userMetadata());
}
return new FileMeta()
.setLastModifiedTime(Date.from(stat.lastModified().toInstant()))
.setLength(stat.size())
.setMetaInfo(metaInfo);
});
} catch (Exception oe) {
String errorCode = oe.getMessage();
if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
return Optional.empty();
}
ExceptionUtils.rethrow(oe);
}
return Optional.empty();
}
private static String parseFileName(FileLocation fileLocation) {
return String.format("%s/%s", fileLocation.getBucket(), fileLocation.getName());
}
/**
* 清理过期文件
*
* @param bucket 桶名
* @param days 日期
*/
@Override
public void cleanExpiredFiles(String bucket, int days) {
/*
使用Minio的管理界面或Minio客户端命令行工具设置对象的生命周期规则在生命周期规则中定义文件的过期时间Minio将自动根据设置的规则删除过期的文件
*/
}
/**
* 释放连接
*/
@Override
public void destroy() {
//minioClient.close();
}
/**
* 初始化minio
*
* @param applicationContext /
*/
@Override
protected void init(ApplicationContext applicationContext) {
Environment environment = applicationContext.getEnvironment();
String endpoint = fetchProperty(environment, TYPE_MINIO, KEY_ENDPOINT);
String bucketName = fetchProperty(environment, TYPE_MINIO, KEY_BUCKET_NAME);
String accessKey = fetchProperty(environment, TYPE_MINIO, ACCESS_KEY);
String secretKey = fetchProperty(environment, TYPE_MINIO, SECRET_KEY);
try {
initOssClient(endpoint, bucketName, accessKey, secretKey);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
/**
* 创建minio连接并且创建桶
*
* @param endpoint 端口
* @param bucketName 桶名
* @param accessKey 访问密钥
* @param secretKey 秘密密钥
*/
public void initOssClient(String endpoint, String bucketName, String accessKey, String secretKey) {
log.info("[Minio] init OSS by config: endpoint={}, bucketName={}, accessKey={}, secretKey={}", endpoint, bucketName, accessKey, secretKey);
if (StringUtils.isEmpty(bucketName)) {
throw new IllegalArgumentException("'oms.storage.dfs.minio.bucketName' can't be empty, please creat a bucket in minio oss console then config it to powerjob");
}
this.bucket = bucketName;
minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).build();
createBucket(bucketName);
log.info("[Minio] initialize OSS successfully!");
}
/**
* 创建 bucket
*
* @param bucketName 桶名
*/
@SneakyThrows(Exception.class)
public void createBucket(String bucketName) {
if (!bucketExists(bucketName)) {
minioClient.makeBucket(MakeBucketArgs.builder()
.bucket(bucketName).build());
}
String policy = "{\n" +
" \"Version\": \"2012-10-17\",\n" +
" \"Statement\": [\n" +
" {\n" +
" \"Action\": [\n" +
" \"s3:GetObject\"\n" +
" ],\n" +
" \"Effect\": \"Allow\",\n" +
" \"Principal\": {\n" +
" \"AWS\": [\n" +
" \"*\"\n" +
" ]\n" +
" },\n" +
" \"Resource\": [\n" +
" \"arn:aws:s3:::" + bucketName + "/*\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
minioClient.setBucketPolicy(SetBucketPolicyArgs.builder().bucket(bucketName).config(policy).build());
}
/**
* 判断 bucket是否存在
*
* @param bucketName: 桶名
* @return boolean
*/
@SneakyThrows(Exception.class)
public boolean bucketExists(String bucketName) {
return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
}
public static class MinioOssCondition extends PropertyAndOneBeanCondition {
@Override
protected List<String> anyConfigKey() {
return Lists.newArrayList("oms.storage.dfs.minio.endpoint");
}
@Override
protected Class<?> beanType() {
return DFsService.class;
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -7,6 +7,7 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.utils.CommonUtils;
@ -50,6 +51,16 @@ public class ServerController implements ServerInfoAware {
orElseGet(() -> ResultDTO.failed(String.format("app(%s) is not registered! Please register the app in oms-console first.", appName)));
}
@GetMapping("/assertV2")
public ResultDTO<WorkerAppInfo> assertAppNameV2(String appName) {
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);
return appInfoOpt.map(appInfoDO -> {
WorkerAppInfo workerAppInfo = new WorkerAppInfo().setAppId(appInfoDO.getId());
return ResultDTO.success(workerAppInfo);
}).
orElseGet(() -> ResultDTO.failed(String.format("app(%s) is not registered! Please register the app in oms-console first.", appName)));
}
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
return ResultDTO.success(serverElectionService.elect(request));

View File

@ -5,24 +5,24 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>4.3.5</powerjob.worker.version>
<powerjob.worker.version>4.3.6</powerjob.worker.version>
<logback.version>1.2.9</logback.version>
<picocli.version>4.3.2</picocli.version>
<spring.version>5.3.23</spring.version>
<spring.boot.version>2.3.4.RELEASE</spring.boot.version>
<powerjob.official.processors.version>4.3.5</powerjob.official.processors.version>
<powerjob.official.processors.version>4.3.6</powerjob.official.processors.version>
<!-- dependency for dynamic sql processor -->
<mysql.version>8.0.28</mysql.version>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<properties>
<springboot.version>2.7.4</springboot.version>
<powerjob.worker.starter.version>4.3.5</powerjob.worker.starter.version>
<springboot.version>2.7.14</springboot.version>
<powerjob.worker.starter.version>4.3.6</powerjob.worker.starter.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.official.processors.version>4.3.5</powerjob.official.processors.version>
<powerjob.official.processors.version>4.3.6</powerjob.official.processors.version>
<!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip>

View File

@ -4,7 +4,7 @@ spring.jpa.open-in-view=false
# Whether to enable PowerJob Worker, default is true
powerjob.worker.enabled=true
# Turn on test mode and do not force the server connection to be verified
powerjob.worker.enable-test-mode=false
powerjob.worker.allow-lazy-connect-server=false
# Transport port, default is 27777
powerjob.worker.port=27777
# Application name, used for grouping applications. Recommend to set the same value as project name.

View File

@ -5,17 +5,17 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>4.3.5</powerjob.worker.version>
<springboot.version>2.7.4</springboot.version>
<powerjob.worker.version>4.3.6</powerjob.worker.version>
<springboot.version>2.7.14</springboot.version>
</properties>
<dependencies>

View File

@ -72,7 +72,7 @@ public class PowerJobAutoConfiguration {
* When enabledTestMode is set as true, PowerJob-worker no longer connects to PowerJob-server
* or validate appName.
*/
config.setEnableTestMode(worker.isEnableTestMode());
config.setAllowLazyConnectServer(worker.isAllowLazyConnectServer());
/*
* Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignored.
*/

View File

@ -81,14 +81,14 @@ public class PowerJobProperties {
}
@Deprecated
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.enable-test-mode")
@DeprecatedConfigurationProperty(replacement = "powerjob.worker.allow-lazy-connect-server")
public boolean isEnableTestMode() {
return getWorker().enableTestMode;
return getWorker().isAllowLazyConnectServer();
}
@Deprecated
public void setEnableTestMode(boolean enableTestMode) {
getWorker().setEnableTestMode(enableTestMode);
getWorker().setAllowLazyConnectServer(enableTestMode);
}
/**
@ -146,10 +146,10 @@ public class PowerJobProperties {
*/
private int maxResultLength = 8192;
/**
* If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
* Test mode is used for conditions that your have no powerjob-server in your develop env, so you can't start up the application
* If allowLazyConnectServer is set as true, PowerJob worker allows launching without a direct connection to the server.
* allowLazyConnectServer is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
*/
private boolean enableTestMode = false;
private boolean allowLazyConnectServer = false;
/**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignored.
* {@link WorkflowContext} max length for #appendedContextData

View File

@ -5,26 +5,26 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.5</version>
<version>4.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>4.3.5</version>
<version>4.3.6</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.3.23</spring.version>
<h2.db.version>2.1.214</h2.db.version>
<h2.db.version>2.2.220</h2.db.version>
<hikaricp.version>4.0.3</hikaricp.version>
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<powerjob-common.version>4.3.5</powerjob-common.version>
<powerjob-remote-framework.version>4.3.5</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.3.5</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.5</powerjob-remote-impl-http.version>
<powerjob-common.version>4.3.6</powerjob-common.version>
<powerjob-remote-framework.version>4.3.6</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.3.6</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.3.6</powerjob-remote-impl-http.version>
</properties>
<dependencies>

View File

@ -4,11 +4,8 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
@ -21,8 +18,9 @@ import tech.powerjob.worker.actors.ProcessorTrackerActor;
import tech.powerjob.worker.actors.TaskTrackerActor;
import tech.powerjob.worker.actors.WorkerActor;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.WorkerHealthReporter;
import tech.powerjob.worker.background.discovery.PowerJobServerDiscoveryService;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
@ -36,7 +34,6 @@ import tech.powerjob.worker.processor.impl.JarContainerProcessorFactory;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -72,14 +69,14 @@ public class PowerJobWorker {
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");
ServerDiscoveryService serverDiscoveryService = new PowerJobServerDiscoveryService(config);
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
try {
PowerBannerPrinter.print();
// 校验 appName
if (!config.isEnableTestMode()) {
assertAppName();
} else {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
WorkerAppInfo appInfo = serverDiscoveryService.assertApp();
workerRuntime.setAppInfo(appInfo);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String localBindIp = NetUtils.getLocalHost();
@ -113,10 +110,7 @@ public class PowerJobWorker {
workerRuntime.setTransporter(engineOutput.getTransporter());
// 连接 server
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
serverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor());
log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
@ -142,38 +136,6 @@ public class PowerJobWorker {
}
}
@SuppressWarnings("rawtypes")
private void assertAppName() {
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
String url = "http://%s/server/assert?appName=%s";
for (String server : config.getServerAddress()) {
String realUrl = String.format(url, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
workerRuntime.setAppId(appId);
return;
}else {
log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage());
}
}catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
throw new PowerJobException("no server available!");
}
private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) {
List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList());
List<ProcessorFactory> finalPF = Lists.newArrayList(customPF);

View File

@ -8,6 +8,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.utils.TransportUtils;
import java.util.List;

View File

@ -1,13 +1,16 @@
package tech.powerjob.worker.background;
package tech.powerjob.worker.background.discovery;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.ImpossibleException;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.response.ObjectResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
@ -18,6 +21,7 @@ import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -25,13 +29,12 @@ import java.util.concurrent.TimeUnit;
* 服务发现
*
* @author tjq
* @since 2020/4/6
* @since 2023/9/2
*/
@Slf4j
public class ServerDiscoveryService {
public class PowerJobServerDiscoveryService implements ServerDiscoveryService {
private final Long appId;
private final PowerJobWorkerConfig config;
private final WorkerAppInfo appInfo = new WorkerAppInfo();
private String currentServerAddress;
@ -41,6 +44,8 @@ public class ServerDiscoveryService {
* 服务发现地址
*/
private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
private static final String ASSERT_URL = "http://%s/server/assert?appName=%s";
/**
* 失败次数
*/
@ -50,15 +55,76 @@ public class ServerDiscoveryService {
*/
private static final int MAX_FAILED_COUNT = 3;
private final PowerJobWorkerConfig config;
public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
this.appId = appId;
public PowerJobServerDiscoveryService(PowerJobWorkerConfig config) {
this.config = config;
}
public void start(ScheduledExecutorService timingPool) {
@Override
public WorkerAppInfo assertApp() {
try {
return assertApp0();
} catch (Exception e) {
if (config.isAllowLazyConnectServer()) {
log.warn("[PowerJobWorker] worker is not currently connected to the server, and because allowLazyConnectServer is configured to true it won't block the startup, but you have to be aware that this is dangerous in production environments!");
// 返回引用方便后续更新对象内属性
return appInfo;
}
ExceptionUtils.rethrow(e);
}
throw new ImpossibleException();
}
private WorkerAppInfo assertApp0() {
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
for (String server : config.getServerAddress()) {
String realUrl = String.format(ASSERT_URL, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ObjectResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ObjectResultDTO.class);
if (resultDTO.isSuccess()) {
Object resultDataContent = resultDTO.getData();
log.info("[PowerJobWorker] assert appName({}) succeed, result from server is: {}.", appName, resultDataContent);
// 兼容老版本响应为数字
if (StringUtils.isNumeric(resultDataContent.toString())) {
Long appId = Long.valueOf(resultDataContent.toString());
this.appInfo.setAppId(appId);
return appInfo;
}
// 新版本接口直接下发 AppInfo 内容后续可扩展安全加密等信息
WorkerAppInfo serverAppInfo = JsonUtils.parseObject(JsonUtils.toJSONString(resultDataContent), WorkerAppInfo.class);
appInfo.setAppId(serverAppInfo.getAppId());
return appInfo;
} else {
log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage());
}
} catch (PowerJobException oe) {
throw oe;
} catch (Exception ignore) {
log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
throw new PowerJobException("no server available!");
}
@Override
public String getCurrentServerAddress() {
return currentServerAddress;
}
@Override
public void timingCheck(ScheduledExecutorService timingPool) {
this.currentServerAddress = discovery();
if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
if (StringUtils.isEmpty(this.currentServerAddress) && !config.isAllowLazyConnectServer()) {
throw new PowerJobException("can't find any available server, this worker has been quarantined.");
}
// 这里必须保证成功
@ -72,13 +138,18 @@ public class ServerDiscoveryService {
, 10, 10, TimeUnit.SECONDS);
}
public String getCurrentServerAddress() {
return currentServerAddress;
}
private String discovery() {
// 只有允许延迟加载模式下appId 才可能为空每次服务发现前都重新尝试获取 appInfo由于是懒加载链路此处完全忽略异常
if (appInfo.getAppId() == null || appInfo.getAppId() < 0) {
try {
assertApp0();
} catch (Exception e) {
log.warn("[PowerDiscovery] assertAppName in discovery stage failed, msg: {}", e.getMessage());
return null;
}
}
if (ip2Address.isEmpty()) {
config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
}
@ -131,7 +202,7 @@ public class ServerDiscoveryService {
}
}
@SuppressWarnings("rawtypes")
private String acquire(String httpServerAddress) {
String result = null;
String url = buildServerDiscoveryUrl(httpServerAddress);
@ -141,7 +212,7 @@ public class ServerDiscoveryService {
}
if (!StringUtils.isEmpty(result)) {
try {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
ObjectResultDTO resultDTO = JsonUtils.parseObject(result, ObjectResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
@ -154,7 +225,7 @@ public class ServerDiscoveryService {
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setAppId(appInfo.getAppId())
.setCurrentServer(currentServerAddress)
.setProtocol(config.getProtocol().name().toUpperCase());

View File

@ -0,0 +1,32 @@
package tech.powerjob.worker.background.discovery;
import tech.powerjob.common.model.WorkerAppInfo;
import java.util.concurrent.ScheduledExecutorService;
/**
* 服务发现
*
* @author tjq
* @since 2023/9/2
*/
public interface ServerDiscoveryService {
/**
* 鉴权 & 附带信息下发
* @return appInfo
*/
WorkerAppInfo assertApp();
/**
* 获取当前的 server 地址
* @return server 地址
*/
String getCurrentServerAddress();
/**
* 定时检查
* @param timingPool timingPool
*/
void timingCheck(ScheduledExecutorService timingPool);
}

View File

@ -57,10 +57,10 @@ public class PowerJobWorkerConfig {
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
* Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
* If allowLazyConnectServer is set as true, PowerJob worker allows launching without a direct connection to the server.
* allowLazyConnectServer is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
*/
private boolean enableTestMode = false;
private boolean allowLazyConnectServer = false;
/**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
* {@link WorkflowContext} max length for #appendedContextData

View File

@ -1,13 +1,16 @@
package tech.powerjob.worker.common;
import lombok.Data;
import tech.powerjob.common.model.WorkerAppInfo;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.processor.ProcessorLoader;
import java.util.Optional;
/**
* store worker's runtime
*
@ -17,7 +20,10 @@ import tech.powerjob.worker.processor.ProcessorLoader;
@Data
public class WorkerRuntime {
private Long appId;
/**
* App 基础信息
*/
private WorkerAppInfo appInfo;
/**
* 当前执行器地址
*/
@ -42,4 +48,8 @@ public class WorkerRuntime {
private ServerDiscoveryService serverDiscoveryService;
private TaskPersistenceService taskPersistenceService;
public Long getAppId() {
return Optional.ofNullable(appInfo.getAppId()).orElse(-1L);
}
}

View File

@ -68,7 +68,8 @@ public class SystemInfoUtils {
}
metrics.setDiskUsed(bytes2GB(total - free));
metrics.setDiskTotal(bytes2GB(total));
// 防止内存溢出导致total为负数,导致找不到worker实例
metrics.setDiskTotal(bytes2GB(total < 0 ? Long.MAX_VALUE >> 6 : total ));
metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal()));
}