diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml
deleted file mode 100644
index 65635e0c..00000000
--- a/.github/workflows/docker-image.yml
+++ /dev/null
@@ -1,40 +0,0 @@
-name: Docker Image CI
-
-on:
- push:
- branches: [ master ]
-
-jobs:
-
- build:
-
- runs-on: ubuntu-latest
-
- steps:
- - uses: actions/checkout@v2
- - name: Build the Docker image
- run: mvn clean package -Pdev -DskipTests -U -e && /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar && /bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar && /bin/cp -rf powerjob-worker-samples/target/*.jar powerjob-worker-samples/powerjob-worker-samples.jar
- - uses: docker/build-push-action@v1
- with:
- username: ${{ secrets.DOCKER_USERNAME }}
- password: ${{ secrets.DOCKER_PASSWORD }}
- repository: tjqq/powerjob-server
- tag_with_ref: true
- tags: latest
- path: powerjob-server/docker/
- - uses: docker/build-push-action@v1
- with:
- username: ${{ secrets.DOCKER_USERNAME }}
- password: ${{ secrets.DOCKER_PASSWORD }}
- repository: tjqq/powerjob-agent
- tag_with_ref: true
- tags: latest
- path: powerjob-worker-agent/
- - uses: docker/build-push-action@v1
- with:
- username: ${{ secrets.DOCKER_USERNAME }}
- password: ${{ secrets.DOCKER_PASSWORD }}
- repository: tjqq/powerjob-worker-samples
- tag_with_ref: true
- tags: latest
- path: powerjob-worker-samples/
\ No newline at end of file
diff --git a/.github/workflows/docker_publish.yml b/.github/workflows/docker_publish.yml
new file mode 100644
index 00000000..4b0d7f1e
--- /dev/null
+++ b/.github/workflows/docker_publish.yml
@@ -0,0 +1,69 @@
+name: build_docker
+
+on:
+ push:
+ branches: [master]
+ tags:
+ - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10
+
+jobs:
+ build_docker:
+ name: Build docker
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Build Maven Project
+ uses: actions/setup-java@v4
+ with:
+ java-version: '8'
+ distribution: 'temurin'
+ - name: Publish package
+ run: mvn clean package -Pdev -DskipTests -U -e && /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar && /bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar && /bin/cp -rf powerjob-worker-samples/target/*.jar powerjob-worker-samples/powerjob-worker-samples.jar
+
+ # Login
+ - name: Login to Docker Hub
+ uses: docker/login-action@v3
+ with:
+ username: ${{ secrets.DOCKER_USERNAME }}
+ password: ${{ secrets.DOCKER_PASSWORD }}
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v3
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+
+ - name: Build And Push [powerjob-server]
+ uses: docker/build-push-action@v6
+ with:
+ context: powerjob-server/docker/
+ push: true
+ platforms: linux/amd64,linux/arm64
+ tags: |
+ tjqq/powerjob-server:latest
+ powerjob/powerjob-server:latest
+ tjqq/powerjob-server:${{ env.GITHUB_REF_NAME }}
+ powerjob/powerjob-server:${{ env.GITHUB_REF_NAME }}
+
+ - name: Build And Push [powerjob-agent]
+ uses: docker/build-push-action@v6
+ with:
+ context: powerjob-worker-agent/
+ push: true
+ platforms: linux/amd64,linux/arm64
+ tags: |
+ tjqq/powerjob-agent:latest
+ powerjob/powerjob-agent:latest
+ tjqq/powerjob-agent:${{ env.GITHUB_REF_NAME }}
+ powerjob/powerjob-agent:${{ env.GITHUB_REF_NAME }}
+
+ - name: Build And Push [powerjob-worker-samples]
+ uses: docker/build-push-action@v6
+ with:
+ context: powerjob-worker-samples/
+ push: true
+ platforms: linux/amd64,linux/arm64
+ tags: |
+ tjqq/powerjob-worker-samples:latest
+ powerjob/powerjob-worker-samples:latest
+ tjqq/powerjob-worker-samples:${{ env.GITHUB_REF_NAME }}
+ powerjob/powerjob-worker-samples:${{ env.GITHUB_REF_NAME }}
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
deleted file mode 100644
index c2f99971..00000000
--- a/.github/workflows/maven.yml
+++ /dev/null
@@ -1,38 +0,0 @@
-# This workflow will build a Java project with Maven
-# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
-
-name: Java CI with Maven
-
-on:
- push:
- branches: [ master ]
- pull_request:
- branches: [ master ]
-
-jobs:
- build:
-
- runs-on: ubuntu-latest
-
- steps:
- - uses: actions/checkout@v2
- - name: Set up JDK 1.8
- uses: actions/setup-java@v1
- with:
- java-version: 1.8
- - name: Build with Maven
- run: mvn -B clean package -Pdev -DskipTests --file pom.xml
- - name: upload build result
- run: mkdir staging && cp powerjob-server/powerjob-server-starter/target/*.jar staging/powerjob-server.jar && cp powerjob-client/target/*.jar staging/powerjob-client.jar && cp powerjob-worker-agent/target/*.jar staging/powerjob-agent.jar
- - uses: actions/upload-artifact@v1
- with:
- name: powerjob-server.jar
- path: staging/powerjob-server.jar
- - uses: actions/upload-artifact@v1
- with:
- name: powerjob-client.jar
- path: staging/powerjob-client.jar
- - uses: actions/upload-artifact@v1
- with:
- name: powerjob-agent.jar
- path: staging/powerjob-agent.jar
diff --git a/.github/workflows/maven_build.yml b/.github/workflows/maven_build.yml
new file mode 100644
index 00000000..f0042b9a
--- /dev/null
+++ b/.github/workflows/maven_build.yml
@@ -0,0 +1,28 @@
+# This workflow will build a Java project with Maven
+# For more information see: https://docs.github.com/zh/actions/use-cases-and-examples/building-and-testing/building-and-testing-java-with-maven
+
+name: Java CI with Maven
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-java@v4
+ with:
+ java-version: '8'
+ distribution: 'temurin'
+ - run: mvn -B clean package -Pdev -DskipTests --file pom.xml
+ - run: mkdir staging && cp powerjob-server/powerjob-server-starter/target/*.jar staging/powerjob-server.jar && cp powerjob-client/target/*.jar staging/powerjob-client.jar && cp powerjob-worker-agent/target/*.jar staging/powerjob-agent.jar && cp powerjob-worker-spring-boot-starter/target/*.jar staging/powerjob-worker-spring-boot-starter.jar
+ - uses: actions/upload-artifact@v4
+ with:
+ name: Package
+ path: staging
+
diff --git a/.github/workflows/maven_publish.yml b/.github/workflows/maven_publish.yml
new file mode 100644
index 00000000..f374635a
--- /dev/null
+++ b/.github/workflows/maven_publish.yml
@@ -0,0 +1,22 @@
+name: Publish package to the Maven Central Repository
+on:
+ release:
+ types: [created]
+jobs:
+ publish:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up Maven Central Repository
+ uses: actions/setup-java@v4
+ with:
+ java-version: '8'
+ distribution: 'temurin'
+ server-id: ossrh
+ server-username: MAVEN_USERNAME
+ server-password: MAVEN_PASSWORD
+ - name: Publish package
+ run: mvn --batch-mode clean deploy -pl powerjob-worker,powerjob-client,powerjob-worker-spring-boot-starter,powerjob-official-processors,powerjob-worker-agent -DskipTests -Prelease -am
+ env:
+ MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }}
+ MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }}
diff --git a/pom.xml b/pom.xml
index 0f4cbdb3..1863f8a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
tech.powerjob
powerjob
- 5.1.0-bugfix
+ 5.1.1
pom
powerjob
http://www.powerjob.tech
diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index 7ac9a906..ddd317bc 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -5,19 +5,19 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-client
- 5.1.0-bugfix
+ 5.1.1
jar
5.9.1
1.2.13
1.2.83
- 5.1.0-bugfix
+ 5.1.1
3.2.4
diff --git a/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java b/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java
index 0df519b1..e489297c 100644
--- a/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java
+++ b/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java
@@ -3,6 +3,7 @@ package tech.powerjob.client;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
+import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
@@ -50,6 +51,8 @@ public interface IPowerJobClient {
ResultDTO fetchInstanceInfo(Long instanceId);
+ ResultDTO> queryInstanceInfo(InstancePageQuery instancePageQuery);
+
/* ************* Workflow API list ************* */
ResultDTO saveWorkflow(SaveWorkflowRequest request);
diff --git a/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java b/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java
index f2e668ce..3c454516 100644
--- a/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java
+++ b/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java
@@ -17,6 +17,7 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
+import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
import tech.powerjob.common.serialize.JsonUtils;
@@ -335,6 +336,13 @@ public class PowerJobClient implements IPowerJobClient, Closeable {
return JSON.parseObject(post, INSTANCE_RESULT_TYPE);
}
+ @Override
+ public ResultDTO> queryInstanceInfo(InstancePageQuery instancePageQuery) {
+ instancePageQuery.setAppIdEq(appId);
+ String post = requestService.request(OpenAPIConstant.QUERY_INSTANCE, PowerRequestBody.newJsonRequestBody(instancePageQuery));
+ return JSON.parseObject(post, PAGE_INSTANCE_RESULT_TYPE);
+ }
+
/* ************* Workflow API list ************* */
/**
diff --git a/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java b/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java
index c3c23406..2c19460e 100644
--- a/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java
+++ b/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java
@@ -32,6 +32,8 @@ public class TypeStore {
public static final TypeReference>> LIST_INSTANCE_RESULT_TYPE = new TypeReference>>(){};
+ public static final TypeReference>> PAGE_INSTANCE_RESULT_TYPE = new TypeReference>>(){};
+
public static final TypeReference> WF_RESULT_TYPE = new TypeReference>() {};
public static final TypeReference> WF_INSTANCE_RESULT_TYPE = new TypeReference>() {};
diff --git a/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java b/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java
index ac5658d6..9c1b30c0 100644
--- a/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java
+++ b/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java
@@ -1,6 +1,7 @@
package tech.powerjob.client.test;
import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -9,6 +10,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
+import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.response.InstanceInfoDTO;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.common.response.ResultDTO;
@@ -113,6 +115,17 @@ class TestClient extends ClientInitializer {
Assertions.assertNotNull(res);
}
+ @Test
+ void testQueryInstanceInfo() {
+ InstancePageQuery instancePageQuery = new InstancePageQuery();
+ instancePageQuery.setJobIdEq(11L);
+ instancePageQuery.setSortBy("actualTriggerTime");
+ instancePageQuery.setAsc(true);
+ instancePageQuery.setPageSize(3);
+ instancePageQuery.setStatusIn(Lists.newArrayList(1,2,5));
+ TestUtils.output(powerJobClient.queryInstanceInfo(instancePageQuery));
+ }
+
@Test
void testStopInstance() {
ResultDTO res = powerJobClient.stopInstance(702482902331424832L);
diff --git a/powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java b/powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java
new file mode 100644
index 00000000..e09f4071
--- /dev/null
+++ b/powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java
@@ -0,0 +1,17 @@
+package tech.powerjob.client.test;
+
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * TestUtils
+ *
+ * @author tjq
+ * @since 2024/11/21
+ */
+public class TestUtils {
+
+ public static void output(Object v) {
+ String str = JSONObject.toJSONString(v);
+ System.out.println(str);
+ }
+}
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index 3c17297b..ae50dbdf 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -5,12 +5,12 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-common
- 5.1.0-bugfix
+ 5.1.1
jar
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
index cf9819a6..97f859f6 100644
--- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
+++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
@@ -64,4 +64,6 @@ public class PowerJobDKey {
public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval";
+ public static final String SERVER_TEST_ACCOUNT_USERNAME = "powerjob.server.test-accounts";
+
}
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java
index db0043c6..dfd42ce5 100644
--- a/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java
+++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java
@@ -49,6 +49,10 @@ public enum ErrorCodes {
* 系统内部异常
*/
SYSTEM_UNKNOWN_ERROR("-500", "SYS_UNKNOWN_ERROR"),
+ /**
+ * 非法参数
+ */
+ ILLEGAL_ARGS_ERROR("-501", "ILLEGAL_ARGS_ERROR"),
/**
* OPENAPI 错误码号段 -10XX
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java b/powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java
new file mode 100644
index 00000000..d86e13af
--- /dev/null
+++ b/powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java
@@ -0,0 +1,17 @@
+package tech.powerjob.common.exception;
+
+
+import tech.powerjob.common.enums.ErrorCodes;
+
+/**
+ * PowerJobExceptionLauncher
+ *
+ * @author tjq
+ * @since 2024/11/22
+ */
+public class PowerJobExceptionLauncher {
+
+ public PowerJobExceptionLauncher(ErrorCodes errorCode, String message) {
+ throw new PowerJobException(errorCode, message);
+ }
+}
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java b/powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java
new file mode 100644
index 00000000..dff4c6f0
--- /dev/null
+++ b/powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java
@@ -0,0 +1,33 @@
+package tech.powerjob.common.request.query;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 任务实例分页查询
+ *
+ * @author tjq
+ * @since 2024/11/21
+ */
+@Getter
+@Setter
+public class InstancePageQuery extends PowerPageQuery {
+
+ private Long instanceIdEq;
+ private Long instanceIdLt;
+ private Long instanceIdGt;
+
+ private Long jobIdEq;
+
+ private List statusIn;
+
+
+ private Date gmtCreateLt;
+ private Date gmtCreateGt;
+
+ private Date gmtModifiedLt;
+ private Date gmtModifiedGt;
+}
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java b/powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java
new file mode 100644
index 00000000..cd65139b
--- /dev/null
+++ b/powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java
@@ -0,0 +1,41 @@
+package tech.powerjob.common.request.query;
+
+import lombok.Getter;
+import lombok.Setter;
+import tech.powerjob.common.PowerQuery;
+
+import java.io.Serializable;
+
+/**
+ * 分页查询
+ *
+ * @author tjq
+ * @since 2024/11/21
+ */
+@Getter
+@Setter
+public class PowerPageQuery extends PowerQuery implements Serializable {
+
+
+ /* ****************** 分页参数 ****************** */
+ /**
+ * 当前页码
+ */
+ protected Integer index = 0;
+ /**
+ * 页大小
+ */
+ protected Integer pageSize = 10;
+
+ /* ****************** 排序参数 ****************** */
+
+ /**
+ * 排序参数,如 gmtCreate、instanceId
+ */
+ protected String sortBy;
+
+ /**
+ * asc是指定列按升序排列,desc则是指定列按降序排列
+ */
+ protected boolean asc = false;
+}
diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java b/powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java
new file mode 100644
index 00000000..a90c5ee5
--- /dev/null
+++ b/powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java
@@ -0,0 +1,42 @@
+package tech.powerjob.common.response;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 分页对象
+ *
+ * @author tjq
+ * @since 2020/4/12
+ */
+@Data
+@NoArgsConstructor
+@Accessors(chain = true)
+public class PageResult implements Serializable {
+
+ /**
+ * 当前页数
+ */
+ private int index;
+ /**
+ * 页大小
+ */
+ private int pageSize;
+ /**
+ * 总页数
+ */
+ private int totalPages;
+ /**
+ * 总数据量
+ */
+ private long totalItems;
+ /**
+ * 数据
+ */
+ private List data;
+
+}
diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml
index 254f0eaf..efa0739f 100644
--- a/powerjob-official-processors/pom.xml
+++ b/powerjob-official-processors/pom.xml
@@ -5,12 +5,12 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-official-processors
- 5.1.0-bugfix
+ 5.1.1
jar
@@ -20,7 +20,7 @@
5.9.1
1.2.13
- 5.1.0-bugfix
+ 5.1.1
2.2.224
8.0.28
5.3.31
diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml
index 5064f6b1..44937c7b 100644
--- a/powerjob-remote/pom.xml
+++ b/powerjob-remote/pom.xml
@@ -5,7 +5,7 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
pom
diff --git a/powerjob-remote/powerjob-remote-benchmark/pom.xml b/powerjob-remote/powerjob-remote-benchmark/pom.xml
index 917e91c9..dfdeb76b 100644
--- a/powerjob-remote/powerjob-remote-benchmark/pom.xml
+++ b/powerjob-remote/powerjob-remote-benchmark/pom.xml
@@ -5,7 +5,7 @@
powerjob-remote
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
@@ -21,8 +21,8 @@
1.2.13
2.7.18
- 5.1.0-bugfix
- 5.1.0-bugfix
+ 5.1.1
+ 5.1.1
3.9.0
4.2.9
diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml
index 3e647ec0..5e883ed8 100644
--- a/powerjob-remote/powerjob-remote-framework/pom.xml
+++ b/powerjob-remote/powerjob-remote-framework/pom.xml
@@ -5,11 +5,11 @@
powerjob-remote
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
- 5.1.0-bugfix
+ 5.1.1
powerjob-remote-framework
@@ -17,7 +17,7 @@
8
UTF-8
- 5.1.0-bugfix
+ 5.1.1
0.10.2
diff --git a/powerjob-remote/powerjob-remote-impl-akka/pom.xml b/powerjob-remote/powerjob-remote-impl-akka/pom.xml
index 97335b2e..28d7d2ae 100644
--- a/powerjob-remote/powerjob-remote-impl-akka/pom.xml
+++ b/powerjob-remote/powerjob-remote-impl-akka/pom.xml
@@ -5,19 +5,19 @@
powerjob-remote
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-remote-impl-akka
- 5.1.0-bugfix
+ 5.1.1
8
8
UTF-8
- 5.1.0-bugfix
+ 5.1.1
2.6.13
diff --git a/powerjob-remote/powerjob-remote-impl-http/pom.xml b/powerjob-remote/powerjob-remote-impl-http/pom.xml
index e911aef8..572b2731 100644
--- a/powerjob-remote/powerjob-remote-impl-http/pom.xml
+++ b/powerjob-remote/powerjob-remote-impl-http/pom.xml
@@ -5,12 +5,12 @@
powerjob-remote
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-remote-impl-http
- 5.1.0-bugfix
+ 5.1.1
8
@@ -18,7 +18,7 @@
UTF-8
4.3.7
- 5.1.0-bugfix
+ 5.1.1
diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index 22bf07ac..1071da86 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -5,12 +5,12 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-server
- 5.1.0-bugfix
+ 5.1.1
pom
@@ -51,9 +51,9 @@
3.0.10
9.2.1
- 5.1.0-bugfix
- 5.1.0-bugfix
- 5.1.0-bugfix
+ 5.1.1
+ 5.1.1
+ 5.1.1
1.6.14
3.17.1
1.12.665
diff --git a/powerjob-server/powerjob-server-auth/pom.xml b/powerjob-server/powerjob-server-auth/pom.xml
index 5e232bec..17ce8bbe 100644
--- a/powerjob-server/powerjob-server-auth/pom.xml
+++ b/powerjob-server/powerjob-server-auth/pom.xml
@@ -6,7 +6,7 @@
tech.powerjob
powerjob-server
- 5.1.0-bugfix
+ 5.1.1
4.0.0
diff --git a/powerjob-server/powerjob-server-common/pom.xml b/powerjob-server/powerjob-server-common/pom.xml
index 13738427..6cab2dae 100644
--- a/powerjob-server/powerjob-server-common/pom.xml
+++ b/powerjob-server/powerjob-server-common/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-core/pom.xml b/powerjob-server/powerjob-server-core/pom.xml
index c6b2278e..5b7d7cc9 100644
--- a/powerjob-server/powerjob-server-core/pom.xml
+++ b/powerjob-server/powerjob-server-core/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java
index 2f09a052..d9a8ed24 100644
--- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java
+++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java
@@ -3,8 +3,10 @@ package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
-import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
@@ -12,8 +14,10 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerStopInstanceReq;
+import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.response.InstanceInfoDTO;
+import tech.powerjob.common.response.PageResult;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.constants.InstanceType;
import tech.powerjob.server.common.module.WorkerInfo;
@@ -27,8 +31,8 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
-import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.transporter.TransportService;
+import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.Date;
@@ -228,12 +232,21 @@ public class InstanceService {
}
}
- public List queryInstanceInfo(PowerQuery powerQuery) {
- return instanceInfoRepository
- .findAll(QueryConvertUtils.toSpecification(powerQuery))
- .stream()
- .map(InstanceService::directConvert)
- .collect(Collectors.toList());
+ public PageResult queryInstanceInfo(InstancePageQuery instancePageQuery) {
+ Specification specification = QueryConvertUtils.toSpecification(instancePageQuery);
+ Pageable pageable = QueryConvertUtils.toPageable(instancePageQuery);
+ Page instanceInfoDOPage = instanceInfoRepository.findAll(specification, pageable);
+
+ PageResult ret = new PageResult<>();
+ List instanceInfoDTOList = instanceInfoDOPage.get().map(InstanceService::directConvert).collect(Collectors.toList());
+
+ ret.setData(instanceInfoDTOList)
+ .setIndex(instanceInfoDOPage.getNumber())
+ .setPageSize(instanceInfoDOPage.getSize())
+ .setTotalPages(instanceInfoDOPage.getTotalPages())
+ .setTotalItems(instanceInfoDOPage.getTotalElements());
+
+ return ret;
}
/**
diff --git a/powerjob-server/powerjob-server-extension/pom.xml b/powerjob-server/powerjob-server-extension/pom.xml
index 776d9f82..7a0fa142 100644
--- a/powerjob-server/powerjob-server-extension/pom.xml
+++ b/powerjob-server/powerjob-server-extension/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-migrate/pom.xml b/powerjob-server/powerjob-server-migrate/pom.xml
index d7b8cfbe..c539fce3 100644
--- a/powerjob-server/powerjob-server-migrate/pom.xml
+++ b/powerjob-server/powerjob-server-migrate/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-monitor/pom.xml b/powerjob-server/powerjob-server-monitor/pom.xml
index f93b02fd..720974d6 100644
--- a/powerjob-server/powerjob-server-monitor/pom.xml
+++ b/powerjob-server/powerjob-server-monitor/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml
index aeaf95a9..3d8eec79 100644
--- a/powerjob-server/powerjob-server-persistence/pom.xml
+++ b/powerjob-server/powerjob-server-persistence/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java
index 3f4fe41d..ce62c4f3 100644
--- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java
+++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java
@@ -1,14 +1,18 @@
package tech.powerjob.server.persistence;
import com.alibaba.fastjson.JSONArray;
-import tech.powerjob.common.exception.PowerJobException;
-import tech.powerjob.common.PowerQuery;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
+import tech.powerjob.common.PowerQuery;
+import tech.powerjob.common.exception.PowerJobException;
+import tech.powerjob.common.request.query.PowerPageQuery;
-import javax.persistence.criteria.*;
+import javax.persistence.criteria.Predicate;
import java.lang.reflect.Field;
import java.util.List;
@@ -86,6 +90,26 @@ public class QueryConvertUtils {
};
}
+ public static Pageable toPageable(PowerPageQuery powerPageQuery) {
+
+ Sort sorter = null;
+ String sortBy = powerPageQuery.getSortBy();
+ if (StringUtils.isNoneEmpty(sortBy)) {
+ sorter = Sort.by(sortBy);
+ if (powerPageQuery.isAsc()) {
+ sorter.ascending();
+ } else {
+ sorter.descending();
+ }
+ }
+
+ if (sorter == null) {
+ return PageRequest.of(powerPageQuery.getIndex(), powerPageQuery.getPageSize());
+ }
+
+ return PageRequest.of(powerPageQuery.getIndex(), powerPageQuery.getPageSize(), sorter);
+ }
+
public static String convertLikeParams(Object o) {
String s = (String) o;
if (!s.startsWith("%")) {
diff --git a/powerjob-server/powerjob-server-remote/pom.xml b/powerjob-server/powerjob-server-remote/pom.xml
index 82f294cc..cf8d4b52 100644
--- a/powerjob-server/powerjob-server-remote/pom.xml
+++ b/powerjob-server/powerjob-server-remote/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml
index 2e5ff247..b3d7927d 100644
--- a/powerjob-server/powerjob-server-starter/pom.xml
+++ b/powerjob-server/powerjob-server-starter/pom.xml
@@ -5,7 +5,7 @@
powerjob-server
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
../pom.xml
4.0.0
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java
index 40a9abf3..37e4b2bb 100644
--- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java
@@ -7,13 +7,13 @@ import org.springframework.web.bind.annotation.*;
import tech.powerjob.client.module.AppAuthRequest;
import tech.powerjob.client.module.AppAuthResult;
import tech.powerjob.common.OpenAPIConstant;
-import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
+import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
import tech.powerjob.server.core.instance.InstanceService;
@@ -183,7 +183,7 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.QUERY_INSTANCE)
- public ResultDTO> queryInstance(@RequestBody PowerQuery powerQuery) {
+ public ResultDTO> queryInstance(@RequestBody InstancePageQuery powerQuery) {
return ResultDTO.success(instanceService.queryInstanceInfo(powerQuery));
}
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java
index 36d048ef..3206548c 100644
--- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java
@@ -17,6 +17,8 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import tech.powerjob.common.enums.ErrorCodes;
+import tech.powerjob.common.exception.PowerJobExceptionLauncher;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
@@ -80,14 +82,24 @@ public class AppInfoController {
@ApiPermission(name = "App-Save", roleScope = RoleScope.APP, dynamicPermissionPlugin = ModifyOrCreateDynamicPermission.class, grandPermissionPlugin = SaveAppGrantPermissionPlugin.class)
public ResultDTO saveAppInfo(@RequestBody ModifyAppInfoRequest req) {
+ // 根据 ns code 填充 namespaceId(自动化创建过程中,固定的 namespace-code 对用户更友好)
+ if (StringUtils.isNotEmpty(req.getNamespaceCode())) {
+ namespaceWebService.findByCode(req.getNamespaceCode()).ifPresent(x -> req.setNamespaceId(x.getId()));
+ }
+
req.valid();
AppInfoDO appInfoDO;
Long id = req.getId();
if (id == null) {
+
+ // 前置校验,防止部分没加唯一索引的 DB 重复创建记录导致异常
+ appInfoRepository.findByAppName(req.getAppName()).ifPresent(x -> new PowerJobExceptionLauncher(ErrorCodes.ILLEGAL_ARGS_ERROR, String.format("App[%s] already exists", req.getAppName())));
+
appInfoDO = new AppInfoDO();
appInfoDO.setGmtCreate(new Date());
appInfoDO.setCreator(LoginUserHolder.getUserId());
+
} else {
appInfoDO = appInfoService.findById(id, false).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by id:" + id));
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java
index 4669cf62..7d86fbd0 100644
--- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java
@@ -17,7 +17,11 @@ public class ModifyAppInfoRequest {
private Long id;
private String appName;
+ /**
+ * namespace 唯一标识,任选其一传递即可
+ */
private Long namespaceId;
+ private String namespaceCode;
private String oldPassword;
private String password;
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java
index 07f9d218..8ff706aa 100644
--- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java
@@ -22,6 +22,8 @@ public interface NamespaceWebService {
Optional findById(Long id);
+ Optional findByCode(String code);
+
Page list(QueryNamespaceRequest queryNamespaceRequest);
List listAll();
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java
index e504bc81..e376adf2 100644
--- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java
@@ -8,12 +8,14 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
+import tech.powerjob.common.enums.ErrorCodes;
+import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.common.exception.PowerJobException;
+import tech.powerjob.common.exception.PowerJobExceptionLauncher;
import tech.powerjob.server.auth.LoginUserHolder;
import tech.powerjob.server.auth.RoleScope;
import tech.powerjob.server.auth.service.WebAuthService;
import tech.powerjob.server.common.SJ;
-import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.NamespaceDO;
@@ -57,6 +59,9 @@ public class NamespaceWebServiceImpl implements NamespaceWebService {
boolean isCreate = id == null;
if (isCreate) {
+
+ namespaceRepository.findByCode(req.getCode()).ifPresent(x -> new PowerJobExceptionLauncher(ErrorCodes.ILLEGAL_ARGS_ERROR, String.format("namespace[%s] already exists", req.getCode())));
+
namespaceDO = new NamespaceDO();
namespaceDO.setGmtCreate(new Date());
@@ -109,6 +114,14 @@ public class NamespaceWebServiceImpl implements NamespaceWebService {
return namespaceRepository.findById(id);
}
+ @Override
+ public Optional findByCode(String code) {
+ if (StringUtils.isEmpty(code)) {
+ return Optional.empty();
+ }
+ return namespaceRepository.findByCode(code);
+ }
+
@Override
public Page list(QueryNamespaceRequest queryNamespaceRequest) {
String codeLike = queryNamespaceRequest.getCodeLike();
diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java
index 356ff200..20e752d7 100644
--- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java
+++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java
@@ -1,14 +1,17 @@
package tech.powerjob.server.web.service.impl;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
+import tech.powerjob.common.PowerJobDKey;
+import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
-import tech.powerjob.common.enums.ErrorCodes;
-import tech.powerjob.server.auth.common.PowerJobAuthException;
import tech.powerjob.common.utils.DigestUtils;
+import tech.powerjob.server.auth.common.PowerJobAuthException;
+import tech.powerjob.server.common.SJ;
import tech.powerjob.server.persistence.remote.model.PwjbUserInfoDO;
import tech.powerjob.server.persistence.remote.repository.PwjbUserInfoRepository;
import tech.powerjob.server.web.request.ChangePasswordRequest;
@@ -88,7 +91,12 @@ public class PwjbUserWebServiceImplImpl implements PwjbUserWebService {
}
// 测试账号特殊处理
- if (NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS.contains(username)) {
+ Set testAccounts = Sets.newHashSet(NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS);
+ String testAccountsStr = System.getProperty(PowerJobDKey.SERVER_TEST_ACCOUNT_USERNAME);
+ if (StringUtils.isNotEmpty(testAccountsStr)) {
+ testAccounts.addAll(Lists.newArrayList(SJ.COMMA_SPLITTER.split(testAccountsStr)));
+ }
+ if (testAccounts.contains(username)) {
throw new IllegalArgumentException("this account not allowed change the password");
}
diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml
index aef2a46a..e2a5b3e6 100644
--- a/powerjob-worker-agent/pom.xml
+++ b/powerjob-worker-agent/pom.xml
@@ -5,24 +5,24 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-worker-agent
- 5.1.0-bugfix
+ 5.1.1
jar
- 5.1.0-bugfix
+ 5.1.1
1.2.13
4.3.2
5.3.31
2.3.4.RELEASE
- 5.1.0-bugfix
+ 5.1.1
8.0.28
diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml
index 24aa9f16..79f24de8 100644
--- a/powerjob-worker-samples/pom.xml
+++ b/powerjob-worker-samples/pom.xml
@@ -5,22 +5,22 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-worker-samples
- 5.1.0-bugfix
+ 5.1.1
2.7.18
- 5.1.0-bugfix
+ 5.1.1
1.2.83
- 5.1.0-bugfix
+ 5.1.1
true
- 5.1.0-bugfix
+ 5.1.1
diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java
new file mode 100644
index 00000000..abe9f9c2
--- /dev/null
+++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java
@@ -0,0 +1,46 @@
+package tech.powerjob.samples.processors.test;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import tech.powerjob.common.utils.CommonUtils;
+import tech.powerjob.worker.core.processor.ProcessResult;
+import tech.powerjob.worker.core.processor.TaskContext;
+import tech.powerjob.worker.core.processor.TaskResult;
+import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * 测试长时间执行的任务 idle 导致 reduce 不执行
+ *
+ * @author tjq
+ * @since 2024/11/21
+ */
+@Slf4j
+@Component
+public class IdleBugTestProcessor implements MapReduceProcessor {
+
+ @Override
+ public ProcessResult process(TaskContext context) throws Exception {
+ if (isRootTask()) {
+ map(Lists.newArrayList("1", "2", "3", "4", "5", "6", "7"), "L1_TASK");
+ return new ProcessResult(true, "MAP_SUCCESS");
+ }
+
+ Object subTask = context.getSubTask();
+ log.info("[IdleBugTestProcessor] subTask:={}, start to process!", subTask);
+
+ // 同步修改 idle 阈值
+ CommonUtils.easySleep(ThreadLocalRandom.current().nextInt(40001, 60000));
+ log.info("[IdleBugTestProcessor] subTask:={}, finished process", subTask);
+ return new ProcessResult(true, "SUCCESS_" + subTask);
+ }
+
+ @Override
+ public ProcessResult reduce(TaskContext context, List taskResults) {
+ log.info("[IdleBugTestProcessor] [REDUCE] REDUCE!!!");
+ return new ProcessResult(true, "SUCCESS");
+ }
+}
diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml
index 094454c5..e18be923 100644
--- a/powerjob-worker-spring-boot-starter/pom.xml
+++ b/powerjob-worker-spring-boot-starter/pom.xml
@@ -5,16 +5,16 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-worker-spring-boot-starter
- 5.1.0-bugfix
+ 5.1.1
jar
- 5.1.0-bugfix
+ 5.1.1
2.7.18
diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index ae74a553..4938d56e 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -5,12 +5,12 @@
powerjob
tech.powerjob
- 5.1.0-bugfix
+ 5.1.1
4.0.0
powerjob-worker
- 5.1.0-bugfix
+ 5.1.1
jar
@@ -21,10 +21,10 @@
1.2.13
- 5.1.0-bugfix
- 5.1.0-bugfix
- 5.1.0-bugfix
- 5.1.0-bugfix
+ 5.1.1
+ 5.1.1
+ 5.1.1
+ 5.1.1
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
index 3d7650cd..c9daebd6 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
@@ -265,14 +265,21 @@ public class ProcessorTracker {
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
- log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
- // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
- ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
- statusReportReq.setAddress(workerRuntime.getWorkerAddress());
- TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
- destroy();
- return;
+ boolean shouldDestroyWhenIdle = shouldDestroyWhenIdle();
+ log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, shouldDestroyWhenIdle: {}", instanceId, idleTime, shouldDestroyWhenIdle);
+
+ if (shouldDestroyWhenIdle) {
+
+ log.warn("[ProcessorTracker-{}] it's time to tell TaskTracker and then destroy self.", instanceId);
+
+ // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
+ ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
+ statusReportReq.setAddress(workerRuntime.getWorkerAddress());
+ TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
+ destroy();
+ return;
+ }
}
}
}
@@ -300,6 +307,22 @@ public class ProcessorTracker {
}
+ /**
+ * 空闲的时候是否需要自我销毁
+ * @return true or false
+ */
+ private boolean shouldDestroyWhenIdle() {
+ /*
+ https://github.com/PowerJob/PowerJob/issues/1033
+ map 情况下,如果子任务执行较长,任务末期可能出现某一个节点的任务仍在执行,其他机器都已经无任务可执行,被 idle 逻辑关闭节点。如果不幸在‘生成reduce任务后并派发前’关闭了 TaskTracker 所在节点的 PT,那 reduce 任务就会直接失败
+ 解决方案:同 TT 节点的 PT,本身不存在分布式不一致问题,因此不需要 idle 直接关闭 PT 的机制
+ */
+ if (taskTrackerAddress.equalsIgnoreCase(workerRuntime.getWorkerAddress())) {
+ return false;
+ }
+ return true;
+ }
+
/**
* 计算线程池大小
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
index 2ad7ac80..20a88a3b 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
@@ -232,6 +232,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
} else {
+ log.info("[TaskTracker-{}] all subTask has done, start to create final task", instanceId);
+
// 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
@@ -294,8 +296,10 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
if (!disconnectedPTs.isEmpty()) {
- log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
- if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
+ // 广播任务节点丢失后若直接移除 IP 重试,后续会派发到其他节点,导致重复执行,因此此处不能重试 https://github.com/PowerJob/PowerJob/issues/1003
+ boolean needRetry = !ExecuteType.BROADCAST.equals(executeType);
+ log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}, needRetry: {}.", instanceId, disconnectedPTs, needRetry);
+ if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, needRetry)) {
ptStatusHolder.remove(disconnectedPTs);
log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
}
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
index 443cbfd1..8977e2bf 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
@@ -41,7 +41,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
/**
* 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新
@@ -298,7 +300,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
- log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
+ log.debug("[TaskTracker-{}] receive PT's heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
@@ -476,8 +478,10 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 3. 避免大查询,分批派发任务
long currentDispatchNum = 0;
+ LongAdder realDispatchNum = new LongAdder();
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
AtomicInteger index = new AtomicInteger(0);
+ AtomicBoolean skipThisRound = new AtomicBoolean(false);
// 4. 循环查询数据库,获取需要派发的任务
while (maxDispatchNum > currentDispatchNum) {
@@ -490,8 +494,14 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
- if (taskNeedByPassTaskTracker(availablePtIps)) {
+ if (taskNeedByPassTaskTracker()) {
+ int loopTime = 0;
do {
+ loopTime++;
+ if (loopTime > 2) {
+ skipThisRound.set(true);
+ return;
+ }
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
} else {
@@ -499,29 +509,28 @@ public abstract class HeavyTaskTracker extends TaskTracker {
}
}
dispatchTask(task, ptAddress);
+ realDispatchNum.increment();
});
+ if (skipThisRound.get()) {
+ log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId);
+ break;
+ }
+
// 数量不足 或 查询失败,则终止循环
if (needDispatchTasks.size() < dbQueryLimit) {
break;
}
}
- log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
+ long realDispatchNumL = realDispatchNum.longValue();
+ if (realDispatchNumL > 0) {
+ log.info("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, realDispatchNum, stopwatch.stop());
+ }
}
- /**
- * padding的生效条件: 在map || mapReduce 的情况下, 且是该appId的worker是 非单机运行时,才生效。
- * fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中,
- * 且该线程不能通过停止任务的方式去停止,只能通过重启该work实例的方式释放该线程。
- */
- private boolean taskNeedByPassTaskTracker(List availablePtIps) {
+ private boolean taskNeedByPassTaskTracker() {
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
-
- if (availablePtIps.size() <= 1) {
- return false;
- }
-
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
}
return false;
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java
index b148591f..8912cf8f 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java
@@ -24,6 +24,12 @@ class MethodBasicProcessor implements BasicProcessor {
public ProcessResult process(TaskContext context) throws Exception {
try {
Object result = method.invoke(bean, context);
+
+ // 支持直接返回 ProcessResult https://github.com/PowerJob/PowerJob/issues/798
+ if (result instanceof ProcessResult) {
+ return (ProcessResult) result;
+ }
+
return new ProcessResult(true, JsonUtils.toJSONString(result));
} catch (InvocationTargetException ite) {
ExceptionUtils.rethrow(ite.getTargetException());