Merge branch '5.1.1_v2'

This commit is contained in:
tjq 2024-12-07 20:43:22 +08:00
commit e6264fc9a4
52 changed files with 559 additions and 172 deletions

View File

@ -1,40 +0,0 @@
name: Docker Image CI
on:
push:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build the Docker image
run: mvn clean package -Pdev -DskipTests -U -e && /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar && /bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar && /bin/cp -rf powerjob-worker-samples/target/*.jar powerjob-worker-samples/powerjob-worker-samples.jar
- uses: docker/build-push-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
repository: tjqq/powerjob-server
tag_with_ref: true
tags: latest
path: powerjob-server/docker/
- uses: docker/build-push-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
repository: tjqq/powerjob-agent
tag_with_ref: true
tags: latest
path: powerjob-worker-agent/
- uses: docker/build-push-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
repository: tjqq/powerjob-worker-samples
tag_with_ref: true
tags: latest
path: powerjob-worker-samples/

69
.github/workflows/docker_publish.yml vendored Normal file
View File

@ -0,0 +1,69 @@
name: build_docker
on:
push:
branches: [master]
tags:
- 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10
jobs:
build_docker:
name: Build docker
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Maven Project
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
- name: Publish package
run: mvn clean package -Pdev -DskipTests -U -e && /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar && /bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar && /bin/cp -rf powerjob-worker-samples/target/*.jar powerjob-worker-samples/powerjob-worker-samples.jar
# Login
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build And Push [powerjob-server]
uses: docker/build-push-action@v6
with:
context: powerjob-server/docker/
push: true
platforms: linux/amd64,linux/arm64
tags: |
tjqq/powerjob-server:latest
powerjob/powerjob-server:latest
tjqq/powerjob-server:${{ env.GITHUB_REF_NAME }}
powerjob/powerjob-server:${{ env.GITHUB_REF_NAME }}
- name: Build And Push [powerjob-agent]
uses: docker/build-push-action@v6
with:
context: powerjob-worker-agent/
push: true
platforms: linux/amd64,linux/arm64
tags: |
tjqq/powerjob-agent:latest
powerjob/powerjob-agent:latest
tjqq/powerjob-agent:${{ env.GITHUB_REF_NAME }}
powerjob/powerjob-agent:${{ env.GITHUB_REF_NAME }}
- name: Build And Push [powerjob-worker-samples]
uses: docker/build-push-action@v6
with:
context: powerjob-worker-samples/
push: true
platforms: linux/amd64,linux/arm64
tags: |
tjqq/powerjob-worker-samples:latest
powerjob/powerjob-worker-samples:latest
tjqq/powerjob-worker-samples:${{ env.GITHUB_REF_NAME }}
powerjob/powerjob-worker-samples:${{ env.GITHUB_REF_NAME }}

View File

@ -1,38 +0,0 @@
# This workflow will build a Java project with Maven
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
name: Java CI with Maven
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B clean package -Pdev -DskipTests --file pom.xml
- name: upload build result
run: mkdir staging && cp powerjob-server/powerjob-server-starter/target/*.jar staging/powerjob-server.jar && cp powerjob-client/target/*.jar staging/powerjob-client.jar && cp powerjob-worker-agent/target/*.jar staging/powerjob-agent.jar
- uses: actions/upload-artifact@v1
with:
name: powerjob-server.jar
path: staging/powerjob-server.jar
- uses: actions/upload-artifact@v1
with:
name: powerjob-client.jar
path: staging/powerjob-client.jar
- uses: actions/upload-artifact@v1
with:
name: powerjob-agent.jar
path: staging/powerjob-agent.jar

28
.github/workflows/maven_build.yml vendored Normal file
View File

@ -0,0 +1,28 @@
# This workflow will build a Java project with Maven
# For more information see: https://docs.github.com/zh/actions/use-cases-and-examples/building-and-testing/building-and-testing-java-with-maven
name: Java CI with Maven
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
- run: mvn -B clean package -Pdev -DskipTests --file pom.xml
- run: mkdir staging && cp powerjob-server/powerjob-server-starter/target/*.jar staging/powerjob-server.jar && cp powerjob-client/target/*.jar staging/powerjob-client.jar && cp powerjob-worker-agent/target/*.jar staging/powerjob-agent.jar && cp powerjob-worker-spring-boot-starter/target/*.jar staging/powerjob-worker-spring-boot-starter.jar
- uses: actions/upload-artifact@v4
with:
name: Package
path: staging

22
.github/workflows/maven_publish.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: Publish package to the Maven Central Repository
on:
release:
types: [created]
jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Maven Central Repository
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
server-id: ossrh
server-username: MAVEN_USERNAME
server-password: MAVEN_PASSWORD
- name: Publish package
run: mvn --batch-mode clean deploy -pl powerjob-worker,powerjob-client,powerjob-worker-spring-boot-starter,powerjob-official-processors,powerjob-worker-agent -DskipTests -Prelease -am
env:
MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }}
MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }}

View File

@ -6,7 +6,7 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>pom</packaging>
<name>powerjob</name>
<url>http://www.powerjob.tech</url>

View File

@ -5,19 +5,19 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.9.1</junit.version>
<logback.version>1.2.13</logback.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.common.version>5.1.0-bugfix</powerjob.common.version>
<powerjob.common.version>5.1.1</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -3,6 +3,7 @@ package tech.powerjob.client;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
@ -50,6 +51,8 @@ public interface IPowerJobClient {
ResultDTO<InstanceInfoDTO> fetchInstanceInfo(Long instanceId);
ResultDTO<PageResult<InstanceInfoDTO>> queryInstanceInfo(InstancePageQuery instancePageQuery);
/* ************* Workflow API list ************* */
ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request);

View File

@ -17,6 +17,7 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
import tech.powerjob.common.serialize.JsonUtils;
@ -335,6 +336,13 @@ public class PowerJobClient implements IPowerJobClient, Closeable {
return JSON.parseObject(post, INSTANCE_RESULT_TYPE);
}
@Override
public ResultDTO<PageResult<InstanceInfoDTO>> queryInstanceInfo(InstancePageQuery instancePageQuery) {
instancePageQuery.setAppIdEq(appId);
String post = requestService.request(OpenAPIConstant.QUERY_INSTANCE, PowerRequestBody.newJsonRequestBody(instancePageQuery));
return JSON.parseObject(post, PAGE_INSTANCE_RESULT_TYPE);
}
/* ************* Workflow API list ************* */
/**

View File

@ -32,6 +32,8 @@ public class TypeStore {
public static final TypeReference<ResultDTO<List<InstanceInfoDTO>>> LIST_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<List<InstanceInfoDTO>>>(){};
public static final TypeReference<ResultDTO<PageResult<InstanceInfoDTO>>> PAGE_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<PageResult<InstanceInfoDTO>>>(){};
public static final TypeReference<ResultDTO<WorkflowInfoDTO>> WF_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInfoDTO>>() {};
public static final TypeReference<ResultDTO<WorkflowInstanceInfoDTO>> WF_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInstanceInfoDTO>>() {};

View File

@ -1,6 +1,7 @@
package tech.powerjob.client.test;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -9,6 +10,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.response.InstanceInfoDTO;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.common.response.ResultDTO;
@ -113,6 +115,17 @@ class TestClient extends ClientInitializer {
Assertions.assertNotNull(res);
}
@Test
void testQueryInstanceInfo() {
InstancePageQuery instancePageQuery = new InstancePageQuery();
instancePageQuery.setJobIdEq(11L);
instancePageQuery.setSortBy("actualTriggerTime");
instancePageQuery.setAsc(true);
instancePageQuery.setPageSize(3);
instancePageQuery.setStatusIn(Lists.newArrayList(1,2,5));
TestUtils.output(powerJobClient.queryInstanceInfo(instancePageQuery));
}
@Test
void testStopInstance() {
ResultDTO<Void> res = powerJobClient.stopInstance(702482902331424832L);

View File

@ -0,0 +1,17 @@
package tech.powerjob.client.test;
import com.alibaba.fastjson.JSONObject;
/**
* TestUtils
*
* @author tjq
* @since 2024/11/21
*/
public class TestUtils {
public static void output(Object v) {
String str = JSONObject.toJSONString(v);
System.out.println(str);
}
}

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>jar</packaging>
<properties>

View File

@ -64,4 +64,6 @@ public class PowerJobDKey {
public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval";
public static final String SERVER_TEST_ACCOUNT_USERNAME = "powerjob.server.test-accounts";
}

View File

@ -49,6 +49,10 @@ public enum ErrorCodes {
* 系统内部异常
*/
SYSTEM_UNKNOWN_ERROR("-500", "SYS_UNKNOWN_ERROR"),
/**
* 非法参数
*/
ILLEGAL_ARGS_ERROR("-501", "ILLEGAL_ARGS_ERROR"),
/**
* OPENAPI 错误码号段 -10XX

View File

@ -0,0 +1,17 @@
package tech.powerjob.common.exception;
import tech.powerjob.common.enums.ErrorCodes;
/**
* PowerJobExceptionLauncher
*
* @author tjq
* @since 2024/11/22
*/
public class PowerJobExceptionLauncher {
public PowerJobExceptionLauncher(ErrorCodes errorCode, String message) {
throw new PowerJobException(errorCode, message);
}
}

View File

@ -0,0 +1,33 @@
package tech.powerjob.common.request.query;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.List;
/**
* 任务实例分页查询
*
* @author tjq
* @since 2024/11/21
*/
@Getter
@Setter
public class InstancePageQuery extends PowerPageQuery {
private Long instanceIdEq;
private Long instanceIdLt;
private Long instanceIdGt;
private Long jobIdEq;
private List<Integer> statusIn;
private Date gmtCreateLt;
private Date gmtCreateGt;
private Date gmtModifiedLt;
private Date gmtModifiedGt;
}

View File

@ -0,0 +1,41 @@
package tech.powerjob.common.request.query;
import lombok.Getter;
import lombok.Setter;
import tech.powerjob.common.PowerQuery;
import java.io.Serializable;
/**
* 分页查询
*
* @author tjq
* @since 2024/11/21
*/
@Getter
@Setter
public class PowerPageQuery extends PowerQuery implements Serializable {
/* ****************** 分页参数 ****************** */
/**
* 当前页码
*/
protected Integer index = 0;
/**
* 页大小
*/
protected Integer pageSize = 10;
/* ****************** 排序参数 ****************** */
/**
* 排序参数 gmtCreateinstanceId
*/
protected String sortBy;
/**
* asc是指定列按升序排列desc则是指定列按降序排列
*/
protected boolean asc = false;
}

View File

@ -0,0 +1,42 @@
package tech.powerjob.common.response;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.List;
/**
* 分页对象
*
* @author tjq
* @since 2020/4/12
*/
@Data
@NoArgsConstructor
@Accessors(chain = true)
public class PageResult<T> implements Serializable {
/**
* 当前页数
*/
private int index;
/**
* 页大小
*/
private int pageSize;
/**
* 总页数
*/
private int totalPages;
/**
* 总数据量
*/
private long totalItems;
/**
* 数据
*/
private List<T> data;
}

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-official-processors</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>jar</packaging>
<properties>
@ -20,7 +20,7 @@
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.9.1</junit.version>
<logback.version>1.2.13</logback.version>
<powerjob.worker.version>5.1.0-bugfix</powerjob.worker.version>
<powerjob.worker.version>5.1.1</powerjob.worker.version>
<h2.db.version>2.2.224</h2.db.version>
<mysql.version>8.0.28</mysql.version>
<spring.version>5.3.31</spring.version>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -21,8 +21,8 @@
<logback.version>1.2.13</logback.version>
<springboot.version>2.7.18</springboot.version>
<powerjob-remote-impl-http.version>5.1.0-bugfix</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>5.1.0-bugfix</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>5.1.1</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>5.1.1</powerjob-remote-impl-akka.version>
<gatling.version>3.9.0</gatling.version>
<gatling-maven-plugin.version>4.2.9</gatling-maven-plugin.version>

View File

@ -5,11 +5,11 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<artifactId>powerjob-remote-framework</artifactId>
<properties>
@ -17,7 +17,7 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-common.version>5.1.0-bugfix</powerjob-common.version>
<powerjob-common.version>5.1.1</powerjob-common.version>
<reflections.version>0.10.2</reflections.version>

View File

@ -5,19 +5,19 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-remote-framework.version>5.1.0-bugfix</powerjob-remote-framework.version>
<powerjob-remote-framework.version>5.1.1</powerjob-remote-framework.version>
<akka.version>2.6.13</akka.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-http</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -18,7 +18,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<vertx.version>4.3.7</vertx.version>
<powerjob-remote-framework.version>5.1.0-bugfix</powerjob-remote-framework.version>
<powerjob-remote-framework.version>5.1.1</powerjob-remote-framework.version>
</properties>
<dependencies>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>pom</packaging>
<modules>
@ -51,9 +51,9 @@
<groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.2.1</cron-utils.version>
<powerjob-common.version>5.1.0-bugfix</powerjob-common.version>
<powerjob-remote-impl-http.version>5.1.0-bugfix</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>5.1.0-bugfix</powerjob-remote-impl-akka.version>
<powerjob-common.version>5.1.1</powerjob-common.version>
<powerjob-remote-impl-http.version>5.1.1</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>5.1.1</powerjob-remote-impl-akka.version>
<springdoc-openapi-ui.version>1.6.14</springdoc-openapi-ui.version>
<aliyun-sdk-oss.version>3.17.1</aliyun-sdk-oss.version>
<aws-java-sdk-s3.version>1.12.665</aws-java-sdk-s3.version>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -3,8 +3,10 @@ package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
@ -12,8 +14,10 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.response.InstanceInfoDTO;
import tech.powerjob.common.response.PageResult;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.constants.InstanceType;
import tech.powerjob.server.common.module.WorkerInfo;
@ -27,8 +31,8 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.transporter.TransportService;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.Date;
@ -228,12 +232,21 @@ public class InstanceService {
}
}
public List<InstanceInfoDTO> queryInstanceInfo(PowerQuery powerQuery) {
return instanceInfoRepository
.findAll(QueryConvertUtils.toSpecification(powerQuery))
.stream()
.map(InstanceService::directConvert)
.collect(Collectors.toList());
public PageResult<InstanceInfoDTO> queryInstanceInfo(InstancePageQuery instancePageQuery) {
Specification<InstanceInfoDO> specification = QueryConvertUtils.toSpecification(instancePageQuery);
Pageable pageable = QueryConvertUtils.toPageable(instancePageQuery);
Page<InstanceInfoDO> instanceInfoDOPage = instanceInfoRepository.findAll(specification, pageable);
PageResult<InstanceInfoDTO> ret = new PageResult<>();
List<InstanceInfoDTO> instanceInfoDTOList = instanceInfoDOPage.get().map(InstanceService::directConvert).collect(Collectors.toList());
ret.setData(instanceInfoDTOList)
.setIndex(instanceInfoDOPage.getNumber())
.setPageSize(instanceInfoDOPage.getSize())
.setTotalPages(instanceInfoDOPage.getTotalPages())
.setTotalItems(instanceInfoDOPage.getTotalElements());
return ret;
}
/**

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -1,14 +1,18 @@
package tech.powerjob.server.persistence;
import com.alibaba.fastjson.JSONArray;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.PowerQuery;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.query.PowerPageQuery;
import javax.persistence.criteria.*;
import javax.persistence.criteria.Predicate;
import java.lang.reflect.Field;
import java.util.List;
@ -86,6 +90,26 @@ public class QueryConvertUtils {
};
}
public static Pageable toPageable(PowerPageQuery powerPageQuery) {
Sort sorter = null;
String sortBy = powerPageQuery.getSortBy();
if (StringUtils.isNoneEmpty(sortBy)) {
sorter = Sort.by(sortBy);
if (powerPageQuery.isAsc()) {
sorter.ascending();
} else {
sorter.descending();
}
}
if (sorter == null) {
return PageRequest.of(powerPageQuery.getIndex(), powerPageQuery.getPageSize());
}
return PageRequest.of(powerPageQuery.getIndex(), powerPageQuery.getPageSize(), sorter);
}
public static String convertLikeParams(Object o) {
String s = (String) o;
if (!s.startsWith("%")) {

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -7,13 +7,13 @@ import org.springframework.web.bind.annotation.*;
import tech.powerjob.client.module.AppAuthRequest;
import tech.powerjob.client.module.AppAuthResult;
import tech.powerjob.common.OpenAPIConstant;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
import tech.powerjob.common.request.query.InstancePageQuery;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
import tech.powerjob.server.core.instance.InstanceService;
@ -183,7 +183,7 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.QUERY_INSTANCE)
public ResultDTO<List<InstanceInfoDTO>> queryInstance(@RequestBody PowerQuery powerQuery) {
public ResultDTO<PageResult<InstanceInfoDTO>> queryInstance(@RequestBody InstancePageQuery powerQuery) {
return ResultDTO.success(instanceService.queryInstanceInfo(powerQuery));
}

View File

@ -17,6 +17,8 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.exception.PowerJobExceptionLauncher;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
@ -80,14 +82,24 @@ public class AppInfoController {
@ApiPermission(name = "App-Save", roleScope = RoleScope.APP, dynamicPermissionPlugin = ModifyOrCreateDynamicPermission.class, grandPermissionPlugin = SaveAppGrantPermissionPlugin.class)
public ResultDTO<AppInfoVO> saveAppInfo(@RequestBody ModifyAppInfoRequest req) {
// 根据 ns code 填充 namespaceId自动化创建过程中固定的 namespace-code 对用户更友好
if (StringUtils.isNotEmpty(req.getNamespaceCode())) {
namespaceWebService.findByCode(req.getNamespaceCode()).ifPresent(x -> req.setNamespaceId(x.getId()));
}
req.valid();
AppInfoDO appInfoDO;
Long id = req.getId();
if (id == null) {
// 前置校验防止部分没加唯一索引的 DB 重复创建记录导致异常
appInfoRepository.findByAppName(req.getAppName()).ifPresent(x -> new PowerJobExceptionLauncher(ErrorCodes.ILLEGAL_ARGS_ERROR, String.format("App[%s] already exists", req.getAppName())));
appInfoDO = new AppInfoDO();
appInfoDO.setGmtCreate(new Date());
appInfoDO.setCreator(LoginUserHolder.getUserId());
} else {
appInfoDO = appInfoService.findById(id, false).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by id:" + id));

View File

@ -17,7 +17,11 @@ public class ModifyAppInfoRequest {
private Long id;
private String appName;
/**
* namespace 唯一标识任选其一传递即可
*/
private Long namespaceId;
private String namespaceCode;
private String oldPassword;
private String password;

View File

@ -22,6 +22,8 @@ public interface NamespaceWebService {
Optional<NamespaceDO> findById(Long id);
Optional<NamespaceDO> findByCode(String code);
Page<NamespaceDO> list(QueryNamespaceRequest queryNamespaceRequest);
List<NamespaceDO> listAll();

View File

@ -8,12 +8,14 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.exception.PowerJobExceptionLauncher;
import tech.powerjob.server.auth.LoginUserHolder;
import tech.powerjob.server.auth.RoleScope;
import tech.powerjob.server.auth.service.WebAuthService;
import tech.powerjob.server.common.SJ;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.server.persistence.QueryConvertUtils;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.NamespaceDO;
@ -57,6 +59,9 @@ public class NamespaceWebServiceImpl implements NamespaceWebService {
boolean isCreate = id == null;
if (isCreate) {
namespaceRepository.findByCode(req.getCode()).ifPresent(x -> new PowerJobExceptionLauncher(ErrorCodes.ILLEGAL_ARGS_ERROR, String.format("namespace[%s] already exists", req.getCode())));
namespaceDO = new NamespaceDO();
namespaceDO.setGmtCreate(new Date());
@ -109,6 +114,14 @@ public class NamespaceWebServiceImpl implements NamespaceWebService {
return namespaceRepository.findById(id);
}
@Override
public Optional<NamespaceDO> findByCode(String code) {
if (StringUtils.isEmpty(code)) {
return Optional.empty();
}
return namespaceRepository.findByCode(code);
}
@Override
public Page<NamespaceDO> list(QueryNamespaceRequest queryNamespaceRequest) {
String codeLike = queryNamespaceRequest.getCodeLike();

View File

@ -1,14 +1,17 @@
package tech.powerjob.server.web.service.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.server.auth.common.PowerJobAuthException;
import tech.powerjob.common.utils.DigestUtils;
import tech.powerjob.server.auth.common.PowerJobAuthException;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.persistence.remote.model.PwjbUserInfoDO;
import tech.powerjob.server.persistence.remote.repository.PwjbUserInfoRepository;
import tech.powerjob.server.web.request.ChangePasswordRequest;
@ -88,7 +91,12 @@ public class PwjbUserWebServiceImplImpl implements PwjbUserWebService {
}
// 测试账号特殊处理
if (NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS.contains(username)) {
Set<String> testAccounts = Sets.newHashSet(NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS);
String testAccountsStr = System.getProperty(PowerJobDKey.SERVER_TEST_ACCOUNT_USERNAME);
if (StringUtils.isNotEmpty(testAccountsStr)) {
testAccounts.addAll(Lists.newArrayList(SJ.COMMA_SPLITTER.split(testAccountsStr)));
}
if (testAccounts.contains(username)) {
throw new IllegalArgumentException("this account not allowed change the password");
}

View File

@ -5,24 +5,24 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>5.1.0-bugfix</powerjob.worker.version>
<powerjob.worker.version>5.1.1</powerjob.worker.version>
<logback.version>1.2.13</logback.version>
<picocli.version>4.3.2</picocli.version>
<spring.version>5.3.31</spring.version>
<spring.boot.version>2.3.4.RELEASE</spring.boot.version>
<powerjob.official.processors.version>5.1.0-bugfix</powerjob.official.processors.version>
<powerjob.official.processors.version>5.1.1</powerjob.official.processors.version>
<!-- dependency for dynamic sql processor -->
<mysql.version>8.0.28</mysql.version>

View File

@ -5,22 +5,22 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<properties>
<springboot.version>2.7.18</springboot.version>
<powerjob.worker.starter.version>5.1.0-bugfix</powerjob.worker.starter.version>
<powerjob.worker.starter.version>5.1.1</powerjob.worker.starter.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.official.processors.version>5.1.0-bugfix</powerjob.official.processors.version>
<powerjob.official.processors.version>5.1.1</powerjob.official.processors.version>
<!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip>
<powerjob-client.version>5.1.0-bugfix</powerjob-client.version>
<powerjob-client.version>5.1.1</powerjob-client.version>
</properties>
<dependencies>

View File

@ -0,0 +1,46 @@
package tech.powerjob.samples.processors.test;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* <a href="https://github.com/PowerJob/PowerJob/issues/1033">测试长时间执行的任务 idle 导致 reduce 不执行</a>
*
* @author tjq
* @since 2024/11/21
*/
@Slf4j
@Component
public class IdleBugTestProcessor implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
if (isRootTask()) {
map(Lists.newArrayList("1", "2", "3", "4", "5", "6", "7"), "L1_TASK");
return new ProcessResult(true, "MAP_SUCCESS");
}
Object subTask = context.getSubTask();
log.info("[IdleBugTestProcessor] subTask:={}, start to process!", subTask);
// 同步修改 idle 阈值
CommonUtils.easySleep(ThreadLocalRandom.current().nextInt(40001, 60000));
log.info("[IdleBugTestProcessor] subTask:={}, finished process", subTask);
return new ProcessResult(true, "SUCCESS_" + subTask);
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
log.info("[IdleBugTestProcessor] [REDUCE] REDUCE!!!");
return new ProcessResult(true, "SUCCESS");
}
}

View File

@ -5,16 +5,16 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>5.1.0-bugfix</powerjob.worker.version>
<powerjob.worker.version>5.1.1</powerjob.worker.version>
<springboot.version>2.7.18</springboot.version>
</properties>

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.1</version>
<packaging>jar</packaging>
<properties>
@ -21,10 +21,10 @@
<logback.version>1.2.13</logback.version>
<powerjob-common.version>5.1.0-bugfix</powerjob-common.version>
<powerjob-remote-framework.version>5.1.0-bugfix</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>5.1.0-bugfix</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>5.1.0-bugfix</powerjob-remote-impl-http.version>
<powerjob-common.version>5.1.1</powerjob-common.version>
<powerjob-remote-framework.version>5.1.1</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>5.1.1</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>5.1.1</powerjob-remote-impl-http.version>
</properties>
<dependencies>

View File

@ -265,7 +265,13 @@ public class ProcessorTracker {
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
boolean shouldDestroyWhenIdle = shouldDestroyWhenIdle();
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, shouldDestroyWhenIdle: {}", instanceId, idleTime, shouldDestroyWhenIdle);
if (shouldDestroyWhenIdle) {
log.warn("[ProcessorTracker-{}] it's time to tell TaskTracker and then destroy self.", instanceId);
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
@ -276,6 +282,7 @@ public class ProcessorTracker {
}
}
}
}
// 上报状态之前先重新发送失败的任务只要有结果堆积就不上报状态 PT 认为该 TT 失联然后重试相关任务
while (!statusReportRetryQueue.isEmpty()) {
@ -300,6 +307,22 @@ public class ProcessorTracker {
}
/**
* 空闲的时候是否需要自我销毁
* @return true or false
*/
private boolean shouldDestroyWhenIdle() {
/*
https://github.com/PowerJob/PowerJob/issues/1033
map 情况下如果子任务执行较长任务末期可能出现某一个节点的任务仍在执行其他机器都已经无任务可执行 idle 逻辑关闭节点如果不幸在生成reduce任务后并派发前关闭了 TaskTracker 所在节点的 PT reduce 任务就会直接失败
解决方案 TT 节点的 PT本身不存在分布式不一致问题因此不需要 idle 直接关闭 PT 的机制
*/
if (taskTrackerAddress.equalsIgnoreCase(workerRuntime.getWorkerAddress())) {
return false;
}
return true;
}
/**
* 计算线程池大小

View File

@ -232,6 +232,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
} else {
log.info("[TaskTracker-{}] all subTask has done, start to create final task", instanceId);
// 不存在代表前置任务刚刚执行完毕需要创建 lastTask最终任务必须在本机执行
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
@ -294,8 +296,10 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
if (!disconnectedPTs.isEmpty()) {
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
// 广播任务节点丢失后若直接移除 IP 重试后续会派发到其他节点导致重复执行因此此处不能重试 https://github.com/PowerJob/PowerJob/issues/1003
boolean needRetry = !ExecuteType.BROADCAST.equals(executeType);
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}, needRetry: {}.", instanceId, disconnectedPTs, needRetry);
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, needRetry)) {
ptStatusHolder.remove(disconnectedPTs);
log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
}

View File

@ -41,7 +41,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
/**
* 负责管理 JobInstance 的运行主要包括任务的派发MR可能存在大量的任务和状态的更新
@ -298,7 +300,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* @param heartbeatReq ProcessorTracker任务的执行管理器发来的心跳包包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
log.debug("[TaskTracker-{}] receive PT's heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲检查是否已经接收到全部该 ProcessorTracker 负责的任务
@ -476,8 +478,10 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 3. 避免大查询分批派发任务
long currentDispatchNum = 0;
LongAdder realDispatchNum = new LongAdder();
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
AtomicInteger index = new AtomicInteger(0);
AtomicBoolean skipThisRound = new AtomicBoolean(false);
// 4. 循环查询数据库获取需要派发的任务
while (maxDispatchNum > currentDispatchNum) {
@ -490,8 +494,14 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 获取 ProcessorTracker 地址如果 Task 中自带了 Address则使用该 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
if (taskNeedByPassTaskTracker(availablePtIps)) {
if (taskNeedByPassTaskTracker()) {
int loopTime = 0;
do {
loopTime++;
if (loopTime > 2) {
skipThisRound.set(true);
return;
}
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
} else {
@ -499,29 +509,28 @@ public abstract class HeavyTaskTracker extends TaskTracker {
}
}
dispatchTask(task, ptAddress);
realDispatchNum.increment();
});
if (skipThisRound.get()) {
log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId);
break;
}
// 数量不足 查询失败则终止循环
if (needDispatchTasks.size() < dbQueryLimit) {
break;
}
}
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
long realDispatchNumL = realDispatchNum.longValue();
if (realDispatchNumL > 0) {
log.info("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, realDispatchNum, stopwatch.stop());
}
}
/**
* padding的生效条件 在map || mapReduce 的情况下 且是该appId的worker是 非单机运行时才生效
* fix: 当该appId的worker是单机运行 padding时 导致Dispatcher分发任务处于死循环中 致使无法分发任务状态一直为运行中
* 且该线程不能通过停止任务的方式去停止只能通过重启该work实例的方式释放该线程
*/
private boolean taskNeedByPassTaskTracker(List<String> availablePtIps) {
private boolean taskNeedByPassTaskTracker() {
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
if (availablePtIps.size() <= 1) {
return false;
}
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
}
return false;

View File

@ -24,6 +24,12 @@ class MethodBasicProcessor implements BasicProcessor {
public ProcessResult process(TaskContext context) throws Exception {
try {
Object result = method.invoke(bean, context);
// 支持直接返回 ProcessResult https://github.com/PowerJob/PowerJob/issues/798
if (result instanceof ProcessResult) {
return (ProcessResult) result;
}
return new ProcessResult(true, JsonUtils.toJSONString(result));
} catch (InvocationTargetException ite) {
ExceptionUtils.rethrow(ite.getTargetException());