[release] v3.4.4

This commit is contained in:
tjq 2021-01-17 17:19:32 +08:00
commit 74fc5edb1f
48 changed files with 726 additions and 150 deletions

View File

@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier same "printed page" as the copyright notice for easier
identification within third-party archives. identification within third-party archives.
Copyright [yyyy] [name of copyright owner] Copyright [2021] [PowerJob]
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -1,4 +1,4 @@
English | [简体中文](./README_zhCN.md) ### English | [简体中文](./README_zhCN.md)
<p align="center"> <p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/> <img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
@ -11,73 +11,59 @@ English | [简体中文](./README_zhCN.md)
<a href="https://github.com/PowerJob/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a> <a href="https://github.com/PowerJob/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
</p> </p>
- Have you ever wondered how cron jobs could be organized orderly? [PowerJob](https://github.com/PowerJob/PowerJob) is an open-source distributed computing and job scheduling framework which allows developers to easily schedule tasks in their own application.
- Have you ever felt upset about tasks that carry with complex dependencies?
- Have you ever felt helpless when scheduling tasks suddenly terminated without any warning?
- Have you ever felt depressed when batches of business tasks need to be processed in a distributed manner?
Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you! Refer to [PowerJob Introduction](https://www.yuque.com/powerjob/en/introduce) for detailed information.
# Introduction # Introduction
### Features ### Features
- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks, monitor tasks, and view logs online. - **Friendly UI:** [Front-end](http://try.powerjob.tech/#/welcome?appName=powerjob-agent-test&password=123) page is provided and developers can manage their task, monitor the status, check the logs online, etc.
- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API.
- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. **It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability**. - **Abundant Timing Strategies:** Four timing strategies are supported, including CRON expression, fixed rate, fixed delay and OpenAPI which allows you to define your own scheduling policies, such as delaying execution.
- Complete workflow support: PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow.
- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on. - **Multiple Execution Mode:** Four execution modes are supported, including stand-alone, broadcast, Map and MapReduce. Distributed computing resource could be utilized in MapReduce mode, try the magic out [here](https://www.yuque.com/powerjob/en/za1d96#9YOnV)!
- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files.
- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need. - **Workflow(DAG) Support:** Both job dependency management and data communications between jobs are supported.
- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally.
- **Extensive Processor Support:** Developers can write their processors in Java, Shell, Python, and will subsequently support multilingual scheduling via HTTP.
- **Powerful Disaster Tolerance:** As long as there are enough computing nodes, configurable retry policies make it possible for your task to be executed and finished successfully.
- **High Availability & High Performance:** PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server and worker nodes.
### Applicable scenes ### Applicable scenes
- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time. - Timed tasks, for example, allocating e-coupons on 9 AM every morning.
- Scenarios that require all machines to run tasks simultaneously: such as log cleanup. - Broadcast tasks, for example, broadcasting to the cluster to clear logs.
- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster. - MapReduce tasks, for example, speeding up certain job like updating large amounts of data.
- **Scenarios with delayed tasks**: For instance, disposal of overdue orders. - Delayed tasks, for example, processing overdue orders.
- Customized tasks, triggered with [OpenAPI](https://www.yuque.com/powerjob/en/openapi).
### Design goals
PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center,
all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker.
### Online trial ### Online trial
- Address: [try.powerjob.tech](http://try.powerjob.tech/#/welcome?appName=powerjob-agent-test&password=123)
- Recommended to read the documentation first: [here](https://www.yuque.com/powerjob/en/trial)
Trial address: [Online Trial Address](http://try.powerjob.tech/) # Documents
Application name: powerjob-agent-test
Application password: 123
### Comparison with similar products
| | QuartZ | PowerJob |
| ---------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ |
| Timing type | CRON | **CRON, fixed frequency, fixed delay, OpenAPI** |
| Task type | Built-in Java | **Built-in Java, external Java (JVM Container), Shell, Python and other scripts** |
| Distributed strategy | Unsupported | **MapReduce dynamic sharding** |
| Online task management | Unsupported | **Supported** |
| Online logging | Unsupported | **Supported** |
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | **Lock-free design, high performance without upper limit** |
| Alarm monitoring | Unsupported | **Email, WebHook, DingTalk. An interface is provided for customization.** |
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| workflow | Unsupported | **Supported** |
# Document
**[Docs](https://www.yuque.com/powerjob/en/introduce)** **[Docs](https://www.yuque.com/powerjob/en/introduce)**
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** **[中文文档](https://www.yuque.com/powerjob/guidence/intro)**
# User Registration # Known Users
[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/PowerJob/PowerJob/issues/6) [Click to register as PowerJob user!](https://github.com/PowerJob/PowerJob/issues/6)
ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ
<p style="text-align: center"> <p style="text-align: center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/user.png" alt="PowerJob User" title="PowerJob User"/> <img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/user.png" alt="PowerJob User" title="PowerJob User"/>
</p> </p>
# License
PowerJob is released under Apache License 2.0. Please refer to [License](./LICENSE) for details.
# Others # Others
- Welcome to the Gitter Community: [LINK](https://gitter.im/PowerJob/community)
- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production! - Any developer interested in getting more involved in PowerJob may join our [Gitter Community](https://gitter.im/PowerJob/community) and make [contributions](https://github.com/PowerJob/PowerJob/pulls)!
- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious.
- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ = - Reach out to me through email **tengjiqi@gmail.com**. Any issues or questions are welcomed on [Issues](https://github.com/PowerJob/PowerJob/issues).
- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly.
- Look forward to your opinions. Response may be late but not denied.

View File

@ -1,4 +1,4 @@
[English](./README.md) | 简体中文 ### [English](./README.md) | 简体中文
<p align="center"> <p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/> <img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
@ -34,11 +34,8 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
### 在线试用 ### 在线试用
试用地址:[try.powerjob.tech](http://try.powerjob.tech/) * 试用地址:[try.powerjob.tech](http://try.powerjob.tech/#/welcome?appName=powerjob-agent-test&password=123)
试用应用名称powerjob-agent-test * [建议先阅读使用教程了解 PowerJob 的概念和基本用法](https://www.yuque.com/powerjob/guidence/trial)
控制台密码123
[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### 同类产品对比 ### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | | | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
@ -55,11 +52,9 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
# 官方文档 # 官方文档
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** **[中文文档](https://www.yuque.com/powerjob/guidence/intro)**
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** **[Docs](https://www.yuque.com/powerjob/en/introduce)**
PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!
# 接入登记 # 接入登记
[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/PowerJob/PowerJob/issues/6) [点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/PowerJob/PowerJob/issues/6)

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId> <artifactId>powerjob-client</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
<powerjob.common.version>3.4.3</powerjob.common.version> <powerjob.common.version>3.4.4</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version> <mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties> </properties>

View File

@ -1,12 +1,10 @@
package com.github.kfcfans.powerjob.client; package com.github.kfcfans.powerjob.client;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery;
import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils;
@ -111,7 +109,7 @@ public class OhMyClient {
public ResultDTO<Long> saveJob(SaveJobInfoRequest request) { public ResultDTO<Long> saveJob(SaveJobInfoRequest request) {
request.setAppId(appId); request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JSONObject.toJSONString(request); String json = JSONObject.toJSONString(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
return JSONObject.parseObject(post, LONG_RESULT_TYPE); return JSONObject.parseObject(post, LONG_RESULT_TYPE);
@ -131,6 +129,31 @@ public class OhMyClient {
return JSONObject.parseObject(post, JOB_RESULT_TYPE); return JSONObject.parseObject(post, JOB_RESULT_TYPE);
} }
/**
* Query all JobInfo
* @return All JobInfo
*/
public ResultDTO<List<JobInfoDTO>> fetchAllJob() {
RequestBody body = new FormBody.Builder()
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_ALL_JOB, body);
return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE);
}
/**
* Query JobInfo by PowerQuery
* @param powerQuery JobQuery
* @return JobInfo
*/
public ResultDTO<List<JobInfoDTO>> queryJob(JobInfoQuery powerQuery) {
powerQuery.setAppIdEq(appId);
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JsonUtils.toJSONStringUnsafe(powerQuery);
String post = postHA(OpenAPIConstant.QUERY_JOB, RequestBody.create(jsonType, json));
return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE);
}
/** /**
* Disable one Job by jobId * Disable one Job by jobId
* @param jobId jobId * @param jobId jobId

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.client;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.response.*;
import java.util.List;
/** /**
* TypeReference store. * TypeReference store.
* *
@ -19,8 +21,12 @@ public class TypeStore {
public static final TypeReference<ResultDTO<JobInfoDTO>> JOB_RESULT_TYPE = new TypeReference<ResultDTO<JobInfoDTO>>(){}; public static final TypeReference<ResultDTO<JobInfoDTO>> JOB_RESULT_TYPE = new TypeReference<ResultDTO<JobInfoDTO>>(){};
public static final TypeReference<ResultDTO<List<JobInfoDTO>>> LIST_JOB_RESULT_TYPE = new TypeReference<ResultDTO<List<JobInfoDTO>>>(){};
public static final TypeReference<ResultDTO<InstanceInfoDTO>> INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<InstanceInfoDTO>>() {}; public static final TypeReference<ResultDTO<InstanceInfoDTO>> INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<InstanceInfoDTO>>() {};
public static final TypeReference<ResultDTO<List<InstanceInfoDTO>>> LIST_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<List<InstanceInfoDTO>>>(){};
public static final TypeReference<ResultDTO<WorkflowInfoDTO>> WF_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInfoDTO>>() {}; public static final TypeReference<ResultDTO<WorkflowInfoDTO>> WF_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInfoDTO>>() {};
public static final TypeReference<ResultDTO<WorkflowInstanceInfoDTO>> WF_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInstanceInfoDTO>>() {}; public static final TypeReference<ResultDTO<WorkflowInstanceInfoDTO>> WF_INSTANCE_RESULT_TYPE = new TypeReference<ResultDTO<WorkflowInstanceInfoDTO>>() {};

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.powerjob.client.test;
import com.github.kfcfans.powerjob.client.OhMyClient;
import org.junit.jupiter.api.BeforeAll;
/**
* Initialize OhMyClient
*
* @author tjq
* @since 1/16/21
*/
public class ClientInitializer {
protected static OhMyClient ohMyClient;
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
}

View File

@ -1,3 +1,5 @@
package com.github.kfcfans.powerjob.client.test;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.ProcessorType;
@ -17,17 +19,10 @@ import java.util.concurrent.TimeUnit;
* @author tjq * @author tjq
* @since 2020/4/15 * @since 2020/4/15
*/ */
public class TestClient { class TestClient extends ClientInitializer {
private static OhMyClient ohMyClient;
public static final long JOB_ID = 4L; public static final long JOB_ID = 4L;
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test @Test
public void testSaveJob() throws Exception { public void testSaveJob() throws Exception {

View File

@ -0,0 +1,45 @@
package com.github.kfcfans.powerjob.client.test;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ForkJoinPool;
/**
* TestConcurrencyControl
*
* @author tjq
* @since 1/16/21
*/
class TestConcurrencyControl extends ClientInitializer {
@Test
void testRunJobConcurrencyControl() {
SaveJobInfoRequest saveJobInfoRequest = new SaveJobInfoRequest();
saveJobInfoRequest.setJobName("test concurrency control job");
saveJobInfoRequest.setProcessorType(ProcessorType.SHELL);
saveJobInfoRequest.setProcessorInfo("pwd");
saveJobInfoRequest.setExecuteType(ExecuteType.STANDALONE);
saveJobInfoRequest.setTimeExpressionType(TimeExpressionType.API);
saveJobInfoRequest.setMaxInstanceNum(1);
Long jobId = ohMyClient.saveJob(saveJobInfoRequest).getData();
System.out.println("jobId: " + jobId);
ForkJoinPool pool = new ForkJoinPool(32);
for (int i = 0; i < 100; i++) {
String params = "index-" + i;
pool.execute(() -> {
ResultDTO<Long> res = ohMyClient.runJob(jobId, params, 0);
System.out.println(params + ": " + res);
});
}
}
}

View File

@ -0,0 +1,49 @@
package com.github.kfcfans.powerjob.client.test;
import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.List;
/**
* Test the query method
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
class TestQuery extends ClientInitializer {
@Test
void testFetchAllJob() {
ResultDTO<List<JobInfoDTO>> allJobRes = ohMyClient.fetchAllJob();
System.out.println(JSON.toJSONString(allJobRes));
}
@Test
void testQueryJob() {
JobInfoQuery jobInfoQuery = new JobInfoQuery()
.idGt(-1L)
.idLt(10086L)
.jobNameLike("DAG")
.gmtModifiedGt(DateUtils.addYears(new Date(), -10))
.gmtModifiedLt(DateUtils.addDays(new Date(), 10))
.executeTypeIn(Lists.newArrayList(ExecuteType.STANDALONE.getV(), ExecuteType.BROADCAST.getV(), ExecuteType.MAP_REDUCE.getV()))
.timeExpressionIn(Lists.newArrayList(TimeExpressionType.API.name(), TimeExpressionType.CRON.name(), TimeExpressionType.WORKFLOW.name(), TimeExpressionType.FIXED_RATE.name()))
.processorTypeIn(Lists.newArrayList(ProcessorType.EMBEDDED_JAVA.getV(), ProcessorType.SHELL.getV(), ProcessorType.JAVA_CONTAINER.getV()))
.processorInfoLike("com.github.kfcfans");
ResultDTO<List<JobInfoDTO>> jobQueryResult = ohMyClient.queryJob(jobInfoQuery);
System.out.println(JSON.toJSONString(jobQueryResult));
}
}

View File

@ -1,3 +1,5 @@
package com.github.kfcfans.powerjob.client.test;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.client.OhMyClient; import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ExecuteType;
@ -7,7 +9,6 @@ import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
@ -18,17 +19,10 @@ import java.util.List;
* @author tjq * @author tjq
* @since 2020/6/2 * @since 2020/6/2
*/ */
public class TestWorkflow { class TestWorkflow extends ClientInitializer {
private static OhMyClient ohMyClient;
private static final long WF_ID = 1; private static final long WF_ID = 1;
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test @Test
public void initTestData() throws Exception { public void initTestData() throws Exception {
SaveJobInfoRequest base = new SaveJobInfoRequest(); SaveJobInfoRequest base = new SaveJobInfoRequest();

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId> <artifactId>powerjob-common</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>

View File

@ -15,6 +15,8 @@ public class OpenAPIConstant {
/* ************* JOB 区 ************* */ /* ************* JOB 区 ************* */
public static final String SAVE_JOB = "/saveJob"; public static final String SAVE_JOB = "/saveJob";
public static final String FETCH_JOB = "/fetchJob"; public static final String FETCH_JOB = "/fetchJob";
public static final String FETCH_ALL_JOB = "/fetchAllJob";
public static final String QUERY_JOB = "/queryJob";
public static final String DISABLE_JOB = "/disableJob"; public static final String DISABLE_JOB = "/disableJob";
public static final String ENABLE_JOB = "/enableJob"; public static final String ENABLE_JOB = "/enableJob";
public static final String DELETE_JOB = "/deleteJob"; public static final String DELETE_JOB = "/deleteJob";
@ -26,6 +28,7 @@ public class OpenAPIConstant {
public static final String RETRY_INSTANCE = "/retryInstance"; public static final String RETRY_INSTANCE = "/retryInstance";
public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus"; public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus";
public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo"; public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo";
public static final String QUERY_INSTANCE = "/queryInstance";
/* ************* Workflow 区 ************* */ /* ************* Workflow 区 ************* */
public static final String SAVE_WORKFLOW = "/saveWorkflow"; public static final String SAVE_WORKFLOW = "/saveWorkflow";

View File

@ -21,4 +21,6 @@ public class PowerJobDKey {
*/ */
public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored"; public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored";
public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period";
} }

View File

@ -0,0 +1,41 @@
package com.github.kfcfans.powerjob.common;
import lombok.Getter;
import lombok.Setter;
/**
* PowerJob Query interface
*
* @author tjq
* @since 2021/1/15
*/
@Getter
@Setter
public abstract class PowerQuery {
public static String EQUAL = "Eq";
public static String NOT_EQUAL = "NotEq";
public static String LIKE = "Like";
public static String NOT_LIKE = "NotLike";
public static String LESS_THAN = "Lt";
public static String LESS_THAN_EQUAL = "LtEq";
public static String GREATER_THAN = "Gt";
public static String GREATER_THAN_EQUAL = "GtEq";
public static String IN = "In";
public static String NOT_IN = "NotIn";
public static String IS_NULL = "IsNull";
public static String IS_NOT_NULL = "IsNotNull";
private Long appIdEq;
}

View File

@ -22,6 +22,7 @@ public class SystemInstanceResult {
public static final String UNKNOWN_BUG = "unknown bug"; public static final String UNKNOWN_BUG = "unknown bug";
// TaskTracker 长时间未上报 // TaskTracker 长时间未上报
public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down"; public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down";
public static final String CAN_NOT_FIND_JOB_INFO = "can't find job info";
/* *********** workflow 专用 *********** */ /* *********** workflow 专用 *********** */
public static final String MIDDLE_JOB_FAILED = "middle job failed"; public static final String MIDDLE_JOB_FAILED = "middle job failed";

View File

@ -0,0 +1,53 @@
package com.github.kfcfans.powerjob.common.request.query;
import com.github.kfcfans.powerjob.common.PowerQuery;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import java.util.Date;
import java.util.List;
/**
* Query JobInfo
*
* @author tjq
* @since 1/16/21
*/
@Getter
@Setter
@Accessors(chain = true, fluent = true)
public class JobInfoQuery extends PowerQuery {
private Long idEq;
private Long idLt;
private Long idGt;
private String jobNameEq;
private String jobNameLike;
private String jobDescriptionLike;
private String jobParamsLike;
private List<Integer> timeExpressionTypeIn;
private List<String> timeExpressionIn;
private List<Integer> executeTypeIn;
private List<Integer> processorTypeIn;
private String processorInfoEq;
private String processorInfoLike;
private List<Integer> statusIn;
private Long nextTriggerTimeGt;
private Long nextTriggerTimeLt;
private String notifyUserIdsLike;
private Date gmtCreateLt;
private Date gmtCreateGt;
private Date gmtModifiedLt;
private Date gmtModifiedGt;
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId> <artifactId>powerjob-server</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<swagger.version>2.9.2</swagger.version> <swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version> <springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.4.3</powerjob.common.version> <powerjob.common.version>3.4.4</powerjob.common.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. --> <!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.19</mysql.version> <mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version> <ojdbc.version>19.7.0.0</ojdbc.version>

View File

@ -0,0 +1,63 @@
package com.github.kfcfans.powerjob.server.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.lang.reflect.Method;
/**
* AOP Utils
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
public class AOPUtils {
private static final ExpressionParser parser = new SpelExpressionParser();
private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
public static Method parseMethod(ProceedingJoinPoint joinPoint) {
Signature pointSignature = joinPoint.getSignature();
if (!(pointSignature instanceof MethodSignature)) {
throw new IllegalArgumentException("this annotation should be used on a method!");
}
MethodSignature signature = (MethodSignature) pointSignature;
Method method = signature.getMethod();
if (method.getDeclaringClass().isInterface()) {
try {
method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes());
} catch (SecurityException | NoSuchMethodException e) {
ExceptionUtils.rethrow(e);
}
}
return method;
}
public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {
String[] params = discoverer.getParameterNames(method);
assert params != null;
EvaluationContext context = new StandardEvaluationContext();
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], arguments[len]);
}
try {
Expression expression = parser.parseExpression(spEl);
return expression.getValue(context, clazz);
} catch (Exception e) {
log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);
return defaultResult;
}
}
}

View File

@ -0,0 +1,104 @@
package com.github.kfcfans.powerjob.server.common.utils;
import com.alibaba.fastjson.JSONArray;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.PowerQuery;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.jpa.domain.Specification;
import javax.persistence.criteria.*;
import java.lang.reflect.Field;
import java.util.List;
/**
* auto convert query to Specification
*
* @author tjq
* @since 2021/1/15
*/
@Slf4j
@SuppressWarnings("unchecked, rawtypes")
public class QueryConvertUtils {
public static <T> Specification<T> toSpecification(PowerQuery powerQuery) {
return (Specification<T>) (root, query, cb) -> {
List<Predicate> predicates = Lists.newLinkedList();
Field[] fields = powerQuery.getClass().getDeclaredFields();
try {
for (Field field : fields) {
field.setAccessible(true);
String fieldName = field.getName();
Object fieldValue = field.get(powerQuery);
if (fieldValue == null) {
continue;
}
if (fieldName.endsWith(PowerQuery.EQUAL)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.EQUAL);
predicates.add(cb.equal(root.get(colName), fieldValue));
} else if (fieldName.endsWith(PowerQuery.NOT_EQUAL)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.NOT_EQUAL);
predicates.add(cb.notEqual(root.get(colName), fieldValue));
} else if (fieldName.endsWith(PowerQuery.LIKE)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.LIKE);
predicates.add(cb.like(root.get(colName), convertLikeParams(fieldValue)));
} else if (fieldName.endsWith(PowerQuery.NOT_LIKE)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.NOT_LIKE);
predicates.add(cb.notLike(root.get(colName), convertLikeParams(fieldValue)));
} else if (fieldName.endsWith(PowerQuery.LESS_THAN)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.LESS_THAN);
predicates.add(cb.lessThan(root.get(colName), (Comparable)fieldValue));
} else if (fieldName.endsWith(PowerQuery.GREATER_THAN)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.GREATER_THAN);
predicates.add(cb.greaterThan(root.get(colName), (Comparable)fieldValue));
} else if (fieldName.endsWith(PowerQuery.LESS_THAN_EQUAL)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.LESS_THAN_EQUAL);
predicates.add(cb.lessThanOrEqualTo(root.get(colName), (Comparable)fieldValue));
} else if (fieldName.endsWith(PowerQuery.GREATER_THAN_EQUAL)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.GREATER_THAN_EQUAL);
predicates.add(cb.greaterThanOrEqualTo(root.get(colName), (Comparable)fieldValue));
} else if (fieldName.endsWith(PowerQuery.IN)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.IN);
predicates.add(root.get(colName).in(convertInParams(fieldValue)));
} else if (fieldName.endsWith(PowerQuery.NOT_IN)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.NOT_IN);
predicates.add(cb.not(root.get(colName).in(convertInParams(fieldValue))));
} else if (fieldName.endsWith(PowerQuery.IS_NULL)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.IS_NULL);
predicates.add(cb.isNull(root.get(colName)));
} else if (fieldName.endsWith(PowerQuery.IS_NOT_NULL)) {
String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.IS_NOT_NULL);
predicates.add(cb.isNotNull(root.get(colName)));
}
}
} catch (Exception e) {
log.warn("[QueryConvertUtils] convert failed for query: {}", query, e);
throw new PowerJobException("convert query object failed, maybe you should redesign your query object!");
}
if (powerQuery.getAppIdEq() != null) {
predicates.add(cb.equal(root.get("appId"), powerQuery.getAppIdEq()));
}
return query.where(predicates.toArray(new Predicate[0])).getRestriction();
};
}
private static String convertLikeParams(Object o) {
String s = (String) o;
if (!s.startsWith("%")) {
s = "%" + s;
}
if (!s.endsWith("%")) {
s = s + "%";
}
return s;
}
private static Object[] convertInParams(Object o) {
// FastJSON, 永远滴神
return JSONArray.parseArray(JSONArray.toJSONString(o)).toArray();
}
}

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.service.lock; package com.github.kfcfans.powerjob.server.extension;
/** /**
* 锁服务所有方法都不允许抛出任何异常 * 锁服务所有方法都不允许抛出任何异常
@ -14,7 +14,7 @@ public interface LockService {
* @param maxLockTime 最长持有锁的时间单位毫秒ms * @param maxLockTime 最长持有锁的时间单位毫秒ms
* @return true -> 获取到锁false -> 未获取到锁 * @return true -> 获取到锁false -> 未获取到锁
*/ */
boolean lock(String name, long maxLockTime); boolean tryLock(String name, long maxLockTime);
/** /**
* 释放锁 * 释放锁

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.repository;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
@ -16,7 +17,7 @@ import java.util.List;
* @author tjq * @author tjq
* @since 2020/4/1 * @since 2020/4/1
*/ */
public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Long> { public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Long>, JpaSpecificationExecutor<InstanceInfoDO> {
/** /**
* 统计当前JOB有多少实例正在运行 * 统计当前JOB有多少实例正在运行

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import java.util.List; import java.util.List;
@ -14,7 +15,7 @@ import java.util.List;
* @author tjq * @author tjq
* @since 2020/4/1 * @since 2020/4/1
*/ */
public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> { public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long>, JpaSpecificationExecutor<JobInfoDO> {
// 调度专用 // 调度专用
@ -32,4 +33,6 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
long countByAppIdAndStatusNot(long appId, int status); long countByAppIdAndStatusNot(long appId, int status);
List<JobInfoDO> findByAppId(Long appId);
} }

View File

@ -18,7 +18,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.web.request.SaveContainerInfoRequest; import com.github.kfcfans.powerjob.server.web.request.SaveContainerInfoRequest;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -209,7 +209,7 @@ public class ContainerService {
String deployLock = "containerDeployLock-" + containerId; String deployLock = "containerDeployLock-" + containerId;
RemoteEndpoint.Async remote = session.getAsyncRemote(); RemoteEndpoint.Async remote = session.getAsyncRemote();
// 最长部署时间10分钟 // 最长部署时间10分钟
boolean lock = lockService.lock(deployLock, 10 * 60 * 1000); boolean lock = lockService.tryLock(deployLock, 10 * 60 * 1000);
if (!lock) { if (!lock) {
remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished."); remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished.");
return; return;

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService; import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService;
import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -46,6 +47,7 @@ public class DispatchService {
private static final Splitter commaSplitter = Splitter.on(","); private static final Splitter commaSplitter = Splitter.on(",");
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024)
public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId()); dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId());
@ -59,6 +61,7 @@ public class DispatchService {
* @param instanceParams 实例的运行参数API触发方式专用 * @param instanceParams 实例的运行参数API触发方式专用
* @param wfInstanceId 工作流任务实例IDworkflow 任务专用 * @param wfInstanceId 工作流任务实例IDworkflow 任务专用
*/ */
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) { public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) {
Long jobId = jobInfo.getId(); Long jobId = jobInfo.getId();
log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams); log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams);
@ -76,15 +79,20 @@ public class DispatchService {
// 查询当前运行的实例数 // 查询当前运行的实例数
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
// 0 代表不限制在线任务还能省去一次 DB 查询
Integer maxInstanceNum = jobInfo.getMaxInstanceNum(); Integer maxInstanceNum = jobInfo.getMaxInstanceNum();
// 秒级任务只派发到一台机器具体的 maxInstanceNum TaskTracker 控制
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
maxInstanceNum = 1;
}
// 0 代表不限制在线任务还能省去一次 DB 查询
if (maxInstanceNum > 0) { if (maxInstanceNum > 0) {
// 这个 runningInstanceCount 已经包含了本 instance
// 不统计 WAITING_DISPATCH 的状态使用 OpenAPI 触发的延迟任务不应该统计进去比如 delay 1 // 不统计 WAITING_DISPATCH 的状态使用 OpenAPI 触发的延迟任务不应该统计进去比如 delay 1
// 由于不统计 WAITING_DISPATCH所以这个 runningInstanceCount 不包含本任务自身
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
// 超出最大同时运行限制不执行调度 // 超出最大同时运行限制不执行调度
if (runningInstanceCount > maxInstanceNum) { if (runningInstanceCount >= maxInstanceNum) {
String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum); String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum);
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.PowerQuery;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
@ -9,6 +10,7 @@ import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer; import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
@ -17,6 +19,7 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -24,6 +27,7 @@ import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* 任务服务 * 任务服务
@ -89,10 +93,16 @@ public class JobService {
} }
public JobInfoDTO fetchJob(Long jobId) { public JobInfoDTO fetchJob(Long jobId) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)); return convert(jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)));
JobInfoDTO jobInfoDTO = new JobInfoDTO(); }
BeanUtils.copyProperties(jobInfoDO, jobInfoDTO);
return jobInfoDTO; public List<JobInfoDTO> fetchAllJob(Long appId) {
return jobInfoRepository.findByAppId(appId).stream().map(JobService::convert).collect(Collectors.toList());
}
public List<JobInfoDTO> queryJob(PowerQuery powerQuery) {
Specification<JobInfoDO> specification = QueryConvertUtils.toSpecification(powerQuery);
return jobInfoRepository.findAll(specification).stream().map(JobService::convert).collect(Collectors.toList());
} }
/** /**
@ -225,4 +235,10 @@ public class JobService {
} }
} }
private static JobInfoDTO convert(JobInfoDO jobInfoDO) {
JobInfoDTO jobInfoDTO = new JobInfoDTO();
BeanUtils.copyProperties(jobInfoDO, jobInfoDTO);
return jobInfoDTO;
}
} }

View File

@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.Ping; import com.github.kfcfans.powerjob.server.akka.requests.Ping;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.github.kfcfans.powerjob.server.extension.LockService;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -76,7 +76,7 @@ public class ServerSelectService {
// 无可用Server重新进行Server选举需要加锁 // 无可用Server重新进行Server选举需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appId); String lockName = String.format(SERVER_ELECT_LOCK, appId);
boolean lockStatus = lockService.lock(lockName, 30000); boolean lockStatus = lockService.tryLock(lockName, 30000);
if (!lockStatus) { if (!lockStatus) {
try { try {
Thread.sleep(500); Thread.sleep(500);

View File

@ -162,7 +162,10 @@ public class InstanceManager {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
JobInstanceAlarm content = new JobInstanceAlarm(); JobInstanceAlarm content = new JobInstanceAlarm();
BeanUtils.copyProperties(jobInfo, content); BeanUtils.copyProperties(jobInfo, content);
// 清理数据库后可能导致 instanceInfo 为空进而导致 NPE
if (instanceInfo != null) {
BeanUtils.copyProperties(instanceInfo, content); BeanUtils.copyProperties(instanceInfo, content);
}
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
AlarmCenter.alarmFailed(content, userList); AlarmCenter.alarmFailed(content, userList);

View File

@ -2,10 +2,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
@ -14,6 +11,7 @@ import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.InstanceType; import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer; import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture; import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
@ -28,8 +26,10 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.Duration; import java.time.Duration;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.github.kfcfans.powerjob.common.InstanceStatus.RUNNING; import static com.github.kfcfans.powerjob.common.InstanceStatus.RUNNING;
import static com.github.kfcfans.powerjob.common.InstanceStatus.STOPPED; import static com.github.kfcfans.powerjob.common.InstanceStatus.STOPPED;
@ -201,16 +201,21 @@ public class InstanceService {
} }
} }
public List<InstanceInfoDTO> queryInstanceInfo(PowerQuery powerQuery) {
return instanceInfoRepository
.findAll(QueryConvertUtils.toSpecification(powerQuery))
.stream()
.map(InstanceService::directConvert)
.collect(Collectors.toList());
}
/** /**
* 获取任务实例的信息 * 获取任务实例的信息
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
* @return 任务实例的信息 * @return 任务实例的信息
*/ */
public InstanceInfoDTO getInstanceInfo(Long instanceId) { public InstanceInfoDTO getInstanceInfo(Long instanceId) {
InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId); return directConvert(fetchInstanceInfo(instanceId));
InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO();
BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO);
return instanceInfoDTO;
} }
/** /**
@ -276,4 +281,10 @@ public class InstanceService {
} }
return instanceInfoDO; return instanceInfoDO;
} }
private static InstanceInfoDTO directConvert(InstanceInfoDO instanceInfoDO) {
InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO();
BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO);
return instanceInfoDTO;
}
} }

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.service.lock;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -24,7 +25,7 @@ public class DatabaseLockService implements LockService {
private OmsLockRepository omsLockRepository; private OmsLockRepository omsLockRepository;
@Override @Override
public boolean lock(String name, long maxLockTime) { public boolean tryLock(String name, long maxLockTime) {
OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime); OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime);
try { try {
@ -43,7 +44,7 @@ public class DatabaseLockService implements LockService {
log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO); log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name); unlock(name);
return lock(name, maxLockTime); return tryLock(name, maxLockTime);
} }
return false; return false;
} }

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.powerjob.server.service.lock.local;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* use segment lock to make concurrent safe
*
* @author tjq
* @since 1/16/21
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseSegmentLock {
String type();
String key();
int concurrencyLevel();
}

View File

@ -0,0 +1,43 @@
package com.github.kfcfans.powerjob.server.service.lock.local;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.common.utils.AOPUtils;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* aspect for @UseSegmentLock
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
@Aspect
@Component
public class UseSegmentLockAspect {
private final Map<String, SegmentLock> lockStore = Maps.newConcurrentMap();
@Around(value = "@annotation(useSegmentLock))")
public Object execute(ProceedingJoinPoint point, UseSegmentLock useSegmentLock) throws Throwable {
SegmentLock segmentLock = lockStore.computeIfAbsent(useSegmentLock.type(), ignore -> {
int concurrencyLevel = useSegmentLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create SegmentLock for [{}] with concurrencyLevel: {}", useSegmentLock.type(), concurrencyLevel);
return new SegmentLock(concurrencyLevel);
});
int index = AOPUtils.parseSpEl(AOPUtils.parseMethod(point), point.getArgs(), useSegmentLock.key(), Integer.class, 1);
try {
segmentLock.lockInterruptibleSafe(index);
return point.proceed();
} finally {
segmentLock.unlock(index);
}
}
}

View File

@ -7,7 +7,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.github.kfcfans.powerjob.server.extension.LockService;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -78,7 +78,7 @@ public class CleanService {
*/ */
private void cleanByOneServer() { private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了 // 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了
boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000);
if (!lock) { if (!lock) {
log.info("[CleanService] clean job is already running, just return."); log.info("[CleanService] clean job is already running, just return.");
return; return;

View File

@ -96,7 +96,7 @@ public class InstanceStatusCheckService {
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
log.warn("[InstanceStatusChecker] instances({}) is not triggered as expected.", waitingDispatchInstances); log.warn("[InstanceStatusChecker] find some instance which is not triggered as expected: {}", waitingDispatchInstances);
waitingDispatchInstances.forEach(instance -> { waitingDispatchInstances.forEach(instance -> {
// 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例 // 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例
@ -105,9 +105,13 @@ public class InstanceStatusCheckService {
return; return;
} }
// 重新派发(orElseGet用于消除编译器警告...) Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(instance.getJobId());
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); if (jobInfoOpt.isPresent()) {
dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0); dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId(), 0);
} else {
log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance);
updateFailedInstance(instance, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO);
}
}); });
} }
@ -136,7 +140,7 @@ public class InstanceStatusCheckService {
// 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度 // 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度
if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) {
updateFailedInstance(instance); updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT);
return; return;
} }
@ -144,7 +148,7 @@ public class InstanceStatusCheckService {
if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) { if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) {
dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
}else { }else {
updateFailedInstance(instance); updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT);
} }
}); });
@ -182,14 +186,14 @@ public class InstanceStatusCheckService {
/** /**
* 处理上报超时而失败的任务实例 * 处理上报超时而失败的任务实例
*/ */
private void updateFailedInstance(InstanceInfoDO instance) { private void updateFailedInstance(InstanceInfoDO instance, String result) {
log.warn("[InstanceStatusCheckService] detected instance(instanceId={},jobId={})'s TaskTracker report timeout,this instance is considered a failure.", instance.getInstanceId(), instance.getJobId()); log.warn("[InstanceStatusChecker] instance[{}] failed due to {}, instanceInfo: {}", instance.getInstanceId(), result, instance);
instance.setStatus(InstanceStatus.FAILED.getV()); instance.setStatus(InstanceStatus.FAILED.getV());
instance.setFinishedTime(System.currentTimeMillis()); instance.setFinishedTime(System.currentTimeMillis());
instance.setGmtModified(new Date()); instance.setGmtModified(new Date());
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instance.setResult(result);
instanceInfoRepository.saveAndFlush(instance); instanceInfoRepository.saveAndFlush(instance);
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT); instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT);

View File

@ -2,7 +2,9 @@ package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OpenAPIConstant; import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.PowerQuery;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery;
import com.github.kfcfans.powerjob.server.service.AppInfoService; import com.github.kfcfans.powerjob.server.service.AppInfoService;
import com.github.kfcfans.powerjob.server.service.CacheService; import com.github.kfcfans.powerjob.server.service.CacheService;
import com.github.kfcfans.powerjob.server.service.JobService; import com.github.kfcfans.powerjob.server.service.JobService;
@ -14,6 +16,7 @@ import com.github.kfcfans.powerjob.common.response.*;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List;
/** /**
* 开放接口OpenAPI控制器对接 oms-client * 开放接口OpenAPI控制器对接 oms-client
@ -60,6 +63,16 @@ public class OpenAPIController {
return ResultDTO.success(jobService.fetchJob(jobId)); return ResultDTO.success(jobService.fetchJob(jobId));
} }
@PostMapping(OpenAPIConstant.FETCH_ALL_JOB)
public ResultDTO<List<JobInfoDTO>> fetchAllJob(Long appId) {
return ResultDTO.success(jobService.fetchAllJob(appId));
}
@PostMapping(OpenAPIConstant.QUERY_JOB)
public ResultDTO<List<JobInfoDTO>> queryJob(@RequestBody JobInfoQuery powerQuery) {
return ResultDTO.success(jobService.queryJob(powerQuery));
}
@PostMapping(OpenAPIConstant.DELETE_JOB) @PostMapping(OpenAPIConstant.DELETE_JOB)
public ResultDTO<Void> deleteJob(Long jobId, Long appId) { public ResultDTO<Void> deleteJob(Long jobId, Long appId) {
checkJobIdValid(jobId, appId); checkJobIdValid(jobId, appId);
@ -119,6 +132,11 @@ public class OpenAPIController {
return ResultDTO.success(instanceService.getInstanceInfo(instanceId)); return ResultDTO.success(instanceService.getInstanceInfo(instanceId));
} }
@PostMapping(OpenAPIConstant.QUERY_INSTANCE)
public ResultDTO<List<InstanceInfoDTO>> queryInstance(@RequestBody PowerQuery powerQuery) {
return ResultDTO.success(instanceService.queryInstanceInfo(powerQuery));
}
/* ************* Workflow 区 ************* */ /* ************* Workflow 区 ************* */
@PostMapping(OpenAPIConstant.SAVE_WORKFLOW) @PostMapping(OpenAPIConstant.SAVE_WORKFLOW)
public ResultDTO<Long> saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception { public ResultDTO<Long> saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception {

View File

@ -8,7 +8,7 @@ ${AnsiColor.GREEN}
░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ ░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████
░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░
${AnsiColor.BRIGHT_RED} ${AnsiColor.BRIGHT_RED}
* Maintainer: tengjiqi@gmail.com & PowerJob-Team * Maintainer: tengjiqi@gmail.com & Team PowerJob
* OfficialWebsite: http://www.powerjob.tech/ * OfficialWebsite: http://www.powerjob.tech/
* SourceCode: https://github.com/PowerJob/PowerJob * SourceCode: https://github.com/PowerJob/PowerJob
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,59 @@
package com.github.kfcfans.powerjob.server.common.utils;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerQuery;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.service.JobService;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.time.DateUtils;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
/**
* test QueryConvertUtils
*
* @author tjq
* @since 2021/1/16
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class QueryConvertUtilsTest {
@Resource
private JobService jobService;
@Test
void autoConvert() {
JobInfoQuery jobInfoQuery = new JobInfoQuery();
jobInfoQuery.setAppIdEq(1L);
jobInfoQuery.setJobNameLike("DAG");
jobInfoQuery.setStatusIn(Lists.newArrayList(1));
jobInfoQuery.setGmtCreateGt(DateUtils.addDays(new Date(), -300));
List<JobInfoDTO> list = jobService.queryJob(jobInfoQuery);
System.out.println("size: " + list.size());
System.out.println(JSONObject.toJSONString(list));
}
@Getter
@Setter
public static class JobInfoQuery extends PowerQuery {
private String jobNameLike;
private Date gmtCreateGt;
private List<Integer> statusIn;
}
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.test; package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.service.timing.CleanService; import com.github.kfcfans.powerjob.server.service.timing.CleanService;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -32,8 +32,8 @@ public class ServiceTest {
public void testLockService() { public void testLockService() {
String lockName = "myLock"; String lockName = "myLock";
lockService.lock(lockName, 10000); lockService.tryLock(lockName, 10000);
lockService.lock(lockName, 10000); lockService.tryLock(lockName, 10000);
lockService.unlock(lockName); lockService.unlock(lockName);
} }

View File

@ -14,14 +14,15 @@ spring.servlet.multipart.max-request-size=209715200
####### 数据库配置 ####### ####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/oms-daily?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5 spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 ####### ####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://remotehost:27017/oms-daily oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
###### OhMyScheduler 自身配置(该配置只允许存在于 application.properties 文件中) ###### ###### OhMyScheduler 自身配置(该配置只允许存在于 application.properties 文件中) ######
# akka ActorSystem 服务端口 # akka ActorSystem 服务端口

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId> <artifactId>powerjob-worker-agent</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.worker.version>3.4.3</powerjob.worker.version> <powerjob.worker.version>3.4.4</powerjob.worker.version>
<logback.version>1.2.3</logback.version> <logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version> <picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId> <artifactId>powerjob-worker-samples</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<properties> <properties>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.4.3</powerjob.worker.starter.version> <powerjob.worker.starter.version>3.4.4</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module --> <!-- 部署时跳过该module -->

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId> <artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.worker.version>3.4.3</powerjob.worker.version> <powerjob.worker.version>3.4.4</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
</properties> </properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId> <artifactId>powerjob-worker</artifactId>
<version>3.4.3</version> <version>3.4.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<spring.version>5.2.4.RELEASE</spring.version> <spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.4.3</powerjob.common.version> <powerjob.common.version>3.4.4</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version> <hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>

View File

@ -198,8 +198,8 @@ public class ProcessorTracker {
int poolSize = calThreadPoolSize(); int poolSize = calThreadPoolSize();
// 待执行队列为了防止对内存造成较大压力内存队列不能太大 // 待执行队列为了防止对内存造成较大压力内存队列不能太大
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);
// 自定义线程池中线程名称 // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP)
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();
// 拒绝策略直接抛出异常 // 拒绝策略直接抛出异常
RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
@ -214,8 +214,8 @@ public class ProcessorTracker {
*/ */
private void initTimingJob() { private void initTimingJob() {
// 全称 oms-ProcessTracker-TimingPool // PowerJob Processor TimingPool
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-ProcessorTrackerTimingPool-%d").build(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();
timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);
@ -340,13 +340,17 @@ public class ProcessorTracker {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
if (executeType == ExecuteType.MAP_REDUCE) {
return instanceInfo.getThreadConcurrency();
}
// 脚本类自带线程池不过为了少一点逻辑判断还是象征性分配一个线程 // 脚本类自带线程池不过为了少一点逻辑判断还是象征性分配一个线程
if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
return 1; return 1;
} }
if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
return instanceInfo.getThreadConcurrency();
}
if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
return instanceInfo.getThreadConcurrency();
}
return 2; return 2;
} }

View File

@ -57,7 +57,8 @@ public class CommonTaskTracker extends TaskTracker {
persistenceRootTask(); persistenceRootTask();
// 开启定时状态检查 // 开启定时状态检查
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 13, 13, TimeUnit.SECONDS); int delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "13"));
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3, delay, TimeUnit.SECONDS);
// 如果是 MR 任务则需要启动执行器动态检测装置 // 如果是 MR 任务则需要启动执行器动态检测装置
ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType()); ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType());

View File

@ -173,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 判断是否超出最大执行实例数 // 判断是否超出最大执行实例数
if (maxInstanceNum > 0) { if (maxInstanceNum > 0) {
if (timeExpressionType == TimeExpressionType.FIXED_RATE) { if (timeExpressionType == TimeExpressionType.FIXED_RATE) {
if (subInstanceId2TimeHolder.size() > maxInstanceNum) { if (subInstanceId2TimeHolder.size() >= maxInstanceNum) {
log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE");
return; return;