diff --git a/.gitignore b/.gitignore index 15e8d172..47d2017f 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,4 @@ build/ *.jar *.log */.DS_Store -.DS_Store +.DS_Store \ No newline at end of file diff --git a/LICENSE b/LICENSE index 261eeb9e..e69d0591 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright [2021] [PowerJob] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index d772fd93..0ccf1b81 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -English | [简体中文](./README_zhCN.md) +### English | [简体中文](./README_zhCN.md)

PowerJob @@ -11,73 +11,59 @@ English | [简体中文](./README_zhCN.md) LICENSE

-- Have you ever wondered how cron jobs could be organized orderly? -- Have you ever felt upset about tasks that carry with complex dependencies? -- Have you ever felt helpless when scheduling tasks suddenly terminated without any warning? -- Have you ever felt depressed when batches of business tasks need to be processed in a distributed manner? +[PowerJob](https://github.com/PowerJob/PowerJob) is an open-source distributed computing and job scheduling framework which allows developers to easily schedule tasks in their own application. -Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you! +Refer to [PowerJob Introduction](https://www.yuque.com/powerjob/en/introduce) for detailed information. # Introduction ### Features -- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks, monitor tasks, and view logs online. -- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API. -- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. **It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability**. -- Complete workflow support: PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow. -- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on. -- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files. -- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need. -- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally. +- **Friendly UI:** [Front-end](http://try.powerjob.tech/#/welcome?appName=powerjob-agent-test&password=123) page is provided and developers can manage their task, monitor the status, check the logs online, etc. + +- **Abundant Timing Strategies:** Four timing strategies are supported, including CRON expression, fixed rate, fixed delay and OpenAPI which allows you to define your own scheduling policies, such as delaying execution. + +- **Multiple Execution Mode:** Four execution modes are supported, including stand-alone, broadcast, Map and MapReduce. Distributed computing resource could be utilized in MapReduce mode, try the magic out [here](https://www.yuque.com/powerjob/en/za1d96#9YOnV)! + +- **Workflow(DAG) Support:** Both job dependency management and data communications between jobs are supported. + +- **Extensive Processor Support:** Developers can write their processors in Java, Shell, Python, and will subsequently support multilingual scheduling via HTTP. + +- **Powerful Disaster Tolerance:** As long as there are enough computing nodes, configurable retry policies make it possible for your task to be executed and finished successfully. + +- **High Availability & High Performance:** PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server and worker nodes. ### Applicable scenes -- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time. -- Scenarios that require all machines to run tasks simultaneously: such as log cleanup. -- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster. -- **Scenarios with delayed tasks**: For instance, disposal of overdue orders. - -### Design goals - -PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center, -all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker. +- Timed tasks, for example, allocating e-coupons on 9 AM every morning. +- Broadcast tasks, for example, broadcasting to the cluster to clear logs. +- MapReduce tasks, for example, speeding up certain job like updating large amounts of data. +- Delayed tasks, for example, processing overdue orders. +- Customized tasks, triggered with [OpenAPI](https://www.yuque.com/powerjob/en/openapi). ### Online trial +- Address: [try.powerjob.tech](http://try.powerjob.tech/#/welcome?appName=powerjob-agent-test&password=123) +- Recommended to read the documentation first: [here](https://www.yuque.com/powerjob/en/trial) -Trial address: [Online Trial Address](http://try.powerjob.tech/) -Application name: powerjob-agent-test -Application password: 123 - -### Comparison with similar products - -| | QuartZ | PowerJob | -| ---------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | -| Timing type | CRON | **CRON, fixed frequency, fixed delay, OpenAPI** | -| Task type | Built-in Java | **Built-in Java, external Java (JVM Container), Shell, Python and other scripts** | -| Distributed strategy | Unsupported | **MapReduce dynamic sharding** | -| Online task management | Unsupported | **Supported** | -| Online logging | Unsupported | **Supported** | -| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | **Lock-free design, high performance without upper limit** | -| Alarm monitoring | Unsupported | **Email, WebHook, DingTalk. An interface is provided for customization.** | -| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | -| workflow | Unsupported | **Supported** | - -# Document +# Documents **[Docs](https://www.yuque.com/powerjob/en/introduce)** -**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** +**[中文文档](https://www.yuque.com/powerjob/guidence/intro)** -# User Registration -[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/PowerJob/PowerJob/issues/6) +# Known Users +[Click to register as PowerJob user!](https://github.com/PowerJob/PowerJob/issues/6) ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ

PowerJob User

+# License + +PowerJob is released under Apache License 2.0. Please refer to [License](./LICENSE) for details. # Others -- Welcome to the Gitter Community: [LINK](https://gitter.im/PowerJob/community) -- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production! -- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious. -- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ = -- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly. \ No newline at end of file + +- Any developer interested in getting more involved in PowerJob may join our [Gitter Community](https://gitter.im/PowerJob/community) and make [contributions](https://github.com/PowerJob/PowerJob/pulls)! + +- Reach out to me through email **tengjiqi@gmail.com**. Any issues or questions are welcomed on [Issues](https://github.com/PowerJob/PowerJob/issues). + +- Look forward to your opinions. Response may be late but not denied. diff --git a/README_zhCN.md b/README_zhCN.md index ed6fa4e9..c461f266 100644 --- a/README_zhCN.md +++ b/README_zhCN.md @@ -1,4 +1,4 @@ -[English](./README.md) | 简体中文 +### [English](./README.md) | 简体中文

PowerJob @@ -34,11 +34,8 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架, PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 ### 在线试用 -试用地址:[try.powerjob.tech](http://try.powerjob.tech/) -试用应用名称:powerjob-agent-test -控制台密码:123 - -[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) +* 试用地址:[try.powerjob.tech](http://try.powerjob.tech/#/welcome?appName=powerjob-agent-test&password=123) +* [建议先阅读使用教程了解 PowerJob 的概念和基本用法](https://www.yuque.com/powerjob/guidence/trial) ### 同类产品对比 | | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | @@ -55,11 +52,9 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为 # 官方文档 -**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** +**[中文文档](https://www.yuque.com/powerjob/guidence/intro)** -**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** - -PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! +**[Docs](https://www.yuque.com/powerjob/en/introduce)** # 接入登记 [点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/PowerJob/PowerJob/issues/6) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 4b2b8795..4cd4c85d 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-client - 3.4.3 + 3.4.4 jar 5.6.1 1.2.68 - 3.4.3 + 3.4.4 3.2.4 diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 7b049b1e..33291d45 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -1,12 +1,10 @@ package com.github.kfcfans.powerjob.client; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.OmsConstant; -import com.github.kfcfans.powerjob.common.OpenAPIConstant; -import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery; import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; @@ -111,7 +109,7 @@ public class OhMyClient { public ResultDTO saveJob(SaveJobInfoRequest request) { request.setAppId(appId); - MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); + MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); String json = JSONObject.toJSONString(request); String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); return JSONObject.parseObject(post, LONG_RESULT_TYPE); @@ -131,6 +129,31 @@ public class OhMyClient { return JSONObject.parseObject(post, JOB_RESULT_TYPE); } + /** + * Query all JobInfo + * @return All JobInfo + */ + public ResultDTO> fetchAllJob() { + RequestBody body = new FormBody.Builder() + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.FETCH_ALL_JOB, body); + return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE); + } + + /** + * Query JobInfo by PowerQuery + * @param powerQuery JobQuery + * @return JobInfo + */ + public ResultDTO> queryJob(JobInfoQuery powerQuery) { + powerQuery.setAppIdEq(appId); + MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); + String json = JsonUtils.toJSONStringUnsafe(powerQuery); + String post = postHA(OpenAPIConstant.QUERY_JOB, RequestBody.create(jsonType, json)); + return JSONObject.parseObject(post, LIST_JOB_RESULT_TYPE); + } + /** * Disable one Job by jobId * @param jobId jobId diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java index 0c680e79..ab60f1b5 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/TypeStore.java @@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.client; import com.alibaba.fastjson.TypeReference; import com.github.kfcfans.powerjob.common.response.*; +import java.util.List; + /** * TypeReference store. * @@ -19,8 +21,12 @@ public class TypeStore { public static final TypeReference> JOB_RESULT_TYPE = new TypeReference>(){}; + public static final TypeReference>> LIST_JOB_RESULT_TYPE = new TypeReference>>(){}; + public static final TypeReference> INSTANCE_RESULT_TYPE = new TypeReference>() {}; + public static final TypeReference>> LIST_INSTANCE_RESULT_TYPE = new TypeReference>>(){}; + public static final TypeReference> WF_RESULT_TYPE = new TypeReference>() {}; public static final TypeReference> WF_INSTANCE_RESULT_TYPE = new TypeReference>() {}; diff --git a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/ClientInitializer.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/ClientInitializer.java new file mode 100644 index 00000000..4198ae6a --- /dev/null +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/ClientInitializer.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.powerjob.client.test; + +import com.github.kfcfans.powerjob.client.OhMyClient; +import org.junit.jupiter.api.BeforeAll; + +/** + * Initialize OhMyClient + * + * @author tjq + * @since 1/16/21 + */ +public class ClientInitializer { + + protected static OhMyClient ohMyClient; + + @BeforeAll + public static void initClient() throws Exception { + ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); + } +} diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestClient.java similarity index 94% rename from powerjob-client/src/test/java/TestClient.java rename to powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestClient.java index 61fae077..00db46f6 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestClient.java @@ -1,3 +1,5 @@ +package com.github.kfcfans.powerjob.client.test; + import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ProcessorType; @@ -17,17 +19,10 @@ import java.util.concurrent.TimeUnit; * @author tjq * @since 2020/4/15 */ -public class TestClient { - - private static OhMyClient ohMyClient; +class TestClient extends ClientInitializer { public static final long JOB_ID = 4L; - @BeforeAll - public static void initClient() throws Exception { - ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); - } - @Test public void testSaveJob() throws Exception { diff --git a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestConcurrencyControl.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestConcurrencyControl.java new file mode 100644 index 00000000..8f10f1a8 --- /dev/null +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestConcurrencyControl.java @@ -0,0 +1,45 @@ +package com.github.kfcfans.powerjob.client.test; + +import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.ProcessorType; +import com.github.kfcfans.powerjob.common.TimeExpressionType; +import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; +import com.github.kfcfans.powerjob.common.response.ResultDTO; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ForkJoinPool; + +/** + * TestConcurrencyControl + * + * @author tjq + * @since 1/16/21 + */ +class TestConcurrencyControl extends ClientInitializer { + + @Test + void testRunJobConcurrencyControl() { + + SaveJobInfoRequest saveJobInfoRequest = new SaveJobInfoRequest(); + saveJobInfoRequest.setJobName("test concurrency control job"); + saveJobInfoRequest.setProcessorType(ProcessorType.SHELL); + saveJobInfoRequest.setProcessorInfo("pwd"); + saveJobInfoRequest.setExecuteType(ExecuteType.STANDALONE); + saveJobInfoRequest.setTimeExpressionType(TimeExpressionType.API); + saveJobInfoRequest.setMaxInstanceNum(1); + + Long jobId = ohMyClient.saveJob(saveJobInfoRequest).getData(); + + System.out.println("jobId: " + jobId); + + ForkJoinPool pool = new ForkJoinPool(32); + + for (int i = 0; i < 100; i++) { + String params = "index-" + i; + pool.execute(() -> { + ResultDTO res = ohMyClient.runJob(jobId, params, 0); + System.out.println(params + ": " + res); + }); + } + } +} diff --git a/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestQuery.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestQuery.java new file mode 100644 index 00000000..cf9f2ad3 --- /dev/null +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestQuery.java @@ -0,0 +1,49 @@ +package com.github.kfcfans.powerjob.client.test; + +import com.alibaba.fastjson.JSON; +import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery; +import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.ProcessorType; +import com.github.kfcfans.powerjob.common.TimeExpressionType; +import com.github.kfcfans.powerjob.common.response.JobInfoDTO; +import com.github.kfcfans.powerjob.common.response.ResultDTO; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateUtils; +import org.junit.jupiter.api.Test; + +import java.util.Date; +import java.util.List; + +/** + * Test the query method + * + * @author tjq + * @since 1/16/21 + */ +@Slf4j +class TestQuery extends ClientInitializer { + + @Test + void testFetchAllJob() { + ResultDTO> allJobRes = ohMyClient.fetchAllJob(); + System.out.println(JSON.toJSONString(allJobRes)); + } + + @Test + void testQueryJob() { + JobInfoQuery jobInfoQuery = new JobInfoQuery() + .idGt(-1L) + .idLt(10086L) + .jobNameLike("DAG") + .gmtModifiedGt(DateUtils.addYears(new Date(), -10)) + .gmtModifiedLt(DateUtils.addDays(new Date(), 10)) + .executeTypeIn(Lists.newArrayList(ExecuteType.STANDALONE.getV(), ExecuteType.BROADCAST.getV(), ExecuteType.MAP_REDUCE.getV())) + .timeExpressionIn(Lists.newArrayList(TimeExpressionType.API.name(), TimeExpressionType.CRON.name(), TimeExpressionType.WORKFLOW.name(), TimeExpressionType.FIXED_RATE.name())) + .processorTypeIn(Lists.newArrayList(ProcessorType.EMBEDDED_JAVA.getV(), ProcessorType.SHELL.getV(), ProcessorType.JAVA_CONTAINER.getV())) + .processorInfoLike("com.github.kfcfans"); + + ResultDTO> jobQueryResult = ohMyClient.queryJob(jobInfoQuery); + System.out.println(JSON.toJSONString(jobQueryResult)); + } +} diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java similarity index 92% rename from powerjob-client/src/test/java/TestWorkflow.java rename to powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java index 71630e7e..2552eae7 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/com/github/kfcfans/powerjob/client/test/TestWorkflow.java @@ -1,3 +1,5 @@ +package com.github.kfcfans.powerjob.client.test; + import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.client.OhMyClient; import com.github.kfcfans.powerjob.common.ExecuteType; @@ -7,7 +9,6 @@ import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.google.common.collect.Lists; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.util.List; @@ -18,17 +19,10 @@ import java.util.List; * @author tjq * @since 2020/6/2 */ -public class TestWorkflow { - - private static OhMyClient ohMyClient; +class TestWorkflow extends ClientInitializer { private static final long WF_ID = 1; - @BeforeAll - public static void initClient() throws Exception { - ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); - } - @Test public void initTestData() throws Exception { SaveJobInfoRequest base = new SaveJobInfoRequest(); diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index a5dce3e0..3f5fa205 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.4.3 + 3.4.4 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java index 2f4868ef..6db90af3 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OpenAPIConstant.java @@ -15,6 +15,8 @@ public class OpenAPIConstant { /* ************* JOB 区 ************* */ public static final String SAVE_JOB = "/saveJob"; public static final String FETCH_JOB = "/fetchJob"; + public static final String FETCH_ALL_JOB = "/fetchAllJob"; + public static final String QUERY_JOB = "/queryJob"; public static final String DISABLE_JOB = "/disableJob"; public static final String ENABLE_JOB = "/enableJob"; public static final String DELETE_JOB = "/deleteJob"; @@ -26,6 +28,7 @@ public class OpenAPIConstant { public static final String RETRY_INSTANCE = "/retryInstance"; public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus"; public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo"; + public static final String QUERY_INSTANCE = "/queryInstance"; /* ************* Workflow 区 ************* */ public static final String SAVE_WORKFLOW = "/saveWorkflow"; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java index b849b9e6..f7a1bacf 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerJobDKey.java @@ -21,4 +21,6 @@ public class PowerJobDKey { */ public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored"; + public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period"; + } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerQuery.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerQuery.java new file mode 100644 index 00000000..8b34ed67 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/PowerQuery.java @@ -0,0 +1,41 @@ +package com.github.kfcfans.powerjob.common; + +import lombok.Getter; +import lombok.Setter; + +/** + * PowerJob Query interface + * + * @author tjq + * @since 2021/1/15 + */ +@Getter +@Setter +public abstract class PowerQuery { + + public static String EQUAL = "Eq"; + + public static String NOT_EQUAL = "NotEq"; + + public static String LIKE = "Like"; + + public static String NOT_LIKE = "NotLike"; + + public static String LESS_THAN = "Lt"; + + public static String LESS_THAN_EQUAL = "LtEq"; + + public static String GREATER_THAN = "Gt"; + + public static String GREATER_THAN_EQUAL = "GtEq"; + + public static String IN = "In"; + + public static String NOT_IN = "NotIn"; + + public static String IS_NULL = "IsNull"; + + public static String IS_NOT_NULL = "IsNotNull"; + + private Long appIdEq; +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java index 5f53fecc..999bc54e 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java @@ -22,6 +22,7 @@ public class SystemInstanceResult { public static final String UNKNOWN_BUG = "unknown bug"; // TaskTracker 长时间未上报 public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down"; + public static final String CAN_NOT_FIND_JOB_INFO = "can't find job info"; /* *********** workflow 专用 *********** */ public static final String MIDDLE_JOB_FAILED = "middle job failed"; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java new file mode 100644 index 00000000..f1b2bf9d --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java @@ -0,0 +1,53 @@ +package com.github.kfcfans.powerjob.common.request.query; + +import com.github.kfcfans.powerjob.common.PowerQuery; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +import java.util.Date; +import java.util.List; + +/** + * Query JobInfo + * + * @author tjq + * @since 1/16/21 + */ +@Getter +@Setter +@Accessors(chain = true, fluent = true) +public class JobInfoQuery extends PowerQuery { + + private Long idEq; + private Long idLt; + private Long idGt; + + private String jobNameEq; + private String jobNameLike; + + private String jobDescriptionLike; + + private String jobParamsLike; + + private List timeExpressionTypeIn; + private List timeExpressionIn; + private List executeTypeIn; + private List processorTypeIn; + + private String processorInfoEq; + private String processorInfoLike; + + private List statusIn; + private Long nextTriggerTimeGt; + private Long nextTriggerTimeLt; + + private String notifyUserIdsLike; + + private Date gmtCreateLt; + private Date gmtCreateGt; + + private Date gmtModifiedLt; + private Date gmtModifiedGt; + +} diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 870ab105..b637e898 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.4.3 + 3.4.4 jar 2.9.2 2.3.4.RELEASE - 3.4.3 + 3.4.4 8.0.19 19.7.0.0 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/AOPUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/AOPUtils.java new file mode 100644 index 00000000..2f0b5067 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/AOPUtils.java @@ -0,0 +1,63 @@ +package com.github.kfcfans.powerjob.server.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.Signature; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.core.LocalVariableTableParameterNameDiscoverer; +import org.springframework.core.ParameterNameDiscoverer; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +import java.lang.reflect.Method; + +/** + * AOP Utils + * + * @author tjq + * @since 1/16/21 + */ +@Slf4j +public class AOPUtils { + + private static final ExpressionParser parser = new SpelExpressionParser(); + private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); + + public static Method parseMethod(ProceedingJoinPoint joinPoint) { + Signature pointSignature = joinPoint.getSignature(); + if (!(pointSignature instanceof MethodSignature)) { + throw new IllegalArgumentException("this annotation should be used on a method!"); + } + MethodSignature signature = (MethodSignature) pointSignature; + Method method = signature.getMethod(); + if (method.getDeclaringClass().isInterface()) { + try { + method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes()); + } catch (SecurityException | NoSuchMethodException e) { + ExceptionUtils.rethrow(e); + } + } + return method; + } + + public static T parseSpEl(Method method, Object[] arguments, String spEl, Class clazz, T defaultResult) { + String[] params = discoverer.getParameterNames(method); + assert params != null; + + EvaluationContext context = new StandardEvaluationContext(); + for (int len = 0; len < params.length; len++) { + context.setVariable(params[len], arguments[len]); + } + try { + Expression expression = parser.parseExpression(spEl); + return expression.getValue(context, clazz); + } catch (Exception e) { + log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e); + return defaultResult; + } + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/QueryConvertUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/QueryConvertUtils.java new file mode 100644 index 00000000..2f8b4ebd --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/QueryConvertUtils.java @@ -0,0 +1,104 @@ +package com.github.kfcfans.powerjob.server.common.utils; + +import com.alibaba.fastjson.JSONArray; +import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.PowerQuery; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.jpa.domain.Specification; + +import javax.persistence.criteria.*; +import java.lang.reflect.Field; +import java.util.List; + +/** + * auto convert query to Specification + * + * @author tjq + * @since 2021/1/15 + */ +@Slf4j +@SuppressWarnings("unchecked, rawtypes") +public class QueryConvertUtils { + + public static Specification toSpecification(PowerQuery powerQuery) { + + return (Specification) (root, query, cb) -> { + List predicates = Lists.newLinkedList(); + Field[] fields = powerQuery.getClass().getDeclaredFields(); + try { + for (Field field : fields) { + field.setAccessible(true); + String fieldName = field.getName(); + Object fieldValue = field.get(powerQuery); + if (fieldValue == null) { + continue; + } + if (fieldName.endsWith(PowerQuery.EQUAL)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.EQUAL); + predicates.add(cb.equal(root.get(colName), fieldValue)); + } else if (fieldName.endsWith(PowerQuery.NOT_EQUAL)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.NOT_EQUAL); + predicates.add(cb.notEqual(root.get(colName), fieldValue)); + } else if (fieldName.endsWith(PowerQuery.LIKE)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.LIKE); + predicates.add(cb.like(root.get(colName), convertLikeParams(fieldValue))); + } else if (fieldName.endsWith(PowerQuery.NOT_LIKE)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.NOT_LIKE); + predicates.add(cb.notLike(root.get(colName), convertLikeParams(fieldValue))); + } else if (fieldName.endsWith(PowerQuery.LESS_THAN)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.LESS_THAN); + predicates.add(cb.lessThan(root.get(colName), (Comparable)fieldValue)); + } else if (fieldName.endsWith(PowerQuery.GREATER_THAN)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.GREATER_THAN); + predicates.add(cb.greaterThan(root.get(colName), (Comparable)fieldValue)); + } else if (fieldName.endsWith(PowerQuery.LESS_THAN_EQUAL)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.LESS_THAN_EQUAL); + predicates.add(cb.lessThanOrEqualTo(root.get(colName), (Comparable)fieldValue)); + } else if (fieldName.endsWith(PowerQuery.GREATER_THAN_EQUAL)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.GREATER_THAN_EQUAL); + predicates.add(cb.greaterThanOrEqualTo(root.get(colName), (Comparable)fieldValue)); + } else if (fieldName.endsWith(PowerQuery.IN)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.IN); + predicates.add(root.get(colName).in(convertInParams(fieldValue))); + } else if (fieldName.endsWith(PowerQuery.NOT_IN)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.NOT_IN); + predicates.add(cb.not(root.get(colName).in(convertInParams(fieldValue)))); + } else if (fieldName.endsWith(PowerQuery.IS_NULL)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.IS_NULL); + predicates.add(cb.isNull(root.get(colName))); + } else if (fieldName.endsWith(PowerQuery.IS_NOT_NULL)) { + String colName = StringUtils.substringBeforeLast(fieldName, PowerQuery.IS_NOT_NULL); + predicates.add(cb.isNotNull(root.get(colName))); + } + } + } catch (Exception e) { + log.warn("[QueryConvertUtils] convert failed for query: {}", query, e); + throw new PowerJobException("convert query object failed, maybe you should redesign your query object!"); + } + + if (powerQuery.getAppIdEq() != null) { + predicates.add(cb.equal(root.get("appId"), powerQuery.getAppIdEq())); + } + + return query.where(predicates.toArray(new Predicate[0])).getRestriction(); + }; + } + + private static String convertLikeParams(Object o) { + String s = (String) o; + if (!s.startsWith("%")) { + s = "%" + s; + } + if (!s.endsWith("%")) { + s = s + "%"; + } + return s; + } + + private static Object[] convertInParams(Object o) { + // FastJSON, 永远滴神! + return JSONArray.parseArray(JSONArray.toJSONString(o)).toArray(); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/LockService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/LockService.java similarity index 82% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/LockService.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/LockService.java index 31536626..d6b89d70 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/LockService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/LockService.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.service.lock; +package com.github.kfcfans.powerjob.server.extension; /** * 锁服务,所有方法都不允许抛出任何异常! @@ -14,7 +14,7 @@ public interface LockService { * @param maxLockTime 最长持有锁的时间,单位毫秒(ms) * @return true -> 获取到锁,false -> 未获取到锁 */ - boolean lock(String name, long maxLockTime); + boolean tryLock(String name, long maxLockTime); /** * 释放锁 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java index 3742e3c8..0872cbab 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.repository; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.google.errorprone.annotations.CanIgnoreReturnValue; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -16,7 +17,7 @@ import java.util.List; * @author tjq * @since 2020/4/1 */ -public interface InstanceInfoRepository extends JpaRepository { +public interface InstanceInfoRepository extends JpaRepository, JpaSpecificationExecutor { /** * 统计当前JOB有多少实例正在运行 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java index 1e033282..146b3711 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java @@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.Query; import java.util.List; @@ -14,7 +15,7 @@ import java.util.List; * @author tjq * @since 2020/4/1 */ -public interface JobInfoRepository extends JpaRepository { +public interface JobInfoRepository extends JpaRepository, JpaSpecificationExecutor { // 调度专用 @@ -32,4 +33,6 @@ public interface JobInfoRepository extends JpaRepository { long countByAppIdAndStatusNot(long appId, int status); + List findByAppId(Long appId); + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java index 0f10b0b4..55e5fcd5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java @@ -18,7 +18,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; -import com.github.kfcfans.powerjob.server.service.lock.LockService; +import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.web.request.SaveContainerInfoRequest; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -209,7 +209,7 @@ public class ContainerService { String deployLock = "containerDeployLock-" + containerId; RemoteEndpoint.Async remote = session.getAsyncRemote(); // 最长部署时间:10分钟 - boolean lock = lockService.lock(deployLock, 10 * 60 * 1000); + boolean lock = lockService.tryLock(deployLock, 10 * 60 * 1000); if (!lock) { remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished."); return; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 6399a72f..43b0fe5c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService; +import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -46,6 +47,7 @@ public class DispatchService { private static final Splitter commaSplitter = Splitter.on(","); + @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId()); @@ -59,6 +61,7 @@ public class DispatchService { * @param instanceParams 实例的运行参数,API触发方式专用 * @param wfInstanceId 工作流任务实例ID,workflow 任务专用 */ + @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) { Long jobId = jobInfo.getId(); log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams); @@ -76,15 +79,20 @@ public class DispatchService { // 查询当前运行的实例数 long current = System.currentTimeMillis(); - // 0 代表不限制在线任务,还能省去一次 DB 查询 Integer maxInstanceNum = jobInfo.getMaxInstanceNum(); + // 秒级任务只派发到一台机器,具体的 maxInstanceNum 由 TaskTracker 控制 + if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { + maxInstanceNum = 1; + } + + // 0 代表不限制在线任务,还能省去一次 DB 查询 if (maxInstanceNum > 0) { - // 这个 runningInstanceCount 已经包含了本 instance // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天) + // 由于不统计 WAITING_DISPATCH,所以这个 runningInstanceCount 不包含本任务自身 long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); // 超出最大同时运行限制,不执行调度 - if (runningInstanceCount > maxInstanceNum) { + if (runningInstanceCount >= maxInstanceNum) { String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index f2e35534..8525846e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.service; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.PowerQuery; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; @@ -9,6 +10,7 @@ import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; +import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; @@ -17,6 +19,7 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -24,6 +27,7 @@ import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; /** * 任务服务 @@ -89,10 +93,16 @@ public class JobService { } public JobInfoDTO fetchJob(Long jobId) { - JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId)); - JobInfoDTO jobInfoDTO = new JobInfoDTO(); - BeanUtils.copyProperties(jobInfoDO, jobInfoDTO); - return jobInfoDTO; + return convert(jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId))); + } + + public List fetchAllJob(Long appId) { + return jobInfoRepository.findByAppId(appId).stream().map(JobService::convert).collect(Collectors.toList()); + } + + public List queryJob(PowerQuery powerQuery) { + Specification specification = QueryConvertUtils.toSpecification(powerQuery); + return jobInfoRepository.findAll(specification).stream().map(JobService::convert).collect(Collectors.toList()); } /** @@ -225,4 +235,10 @@ public class JobService { } } + private static JobInfoDTO convert(JobInfoDO jobInfoDO) { + JobInfoDTO jobInfoDTO = new JobInfoDTO(); + BeanUtils.copyProperties(jobInfoDO, jobInfoDTO); + return jobInfoDTO; + } + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java index 50401c72..37ae701e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java @@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.requests.Ping; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; -import com.github.kfcfans.powerjob.server.service.lock.LockService; +import com.github.kfcfans.powerjob.server.extension.LockService; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -76,7 +76,7 @@ public class ServerSelectService { // 无可用Server,重新进行Server选举,需要加锁 String lockName = String.format(SERVER_ELECT_LOCK, appId); - boolean lockStatus = lockService.lock(lockName, 30000); + boolean lockStatus = lockService.tryLock(lockName, 30000); if (!lockStatus) { try { Thread.sleep(500); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index 48e825f8..338d540a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -162,7 +162,10 @@ public class InstanceManager { InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); JobInstanceAlarm content = new JobInstanceAlarm(); BeanUtils.copyProperties(jobInfo, content); - BeanUtils.copyProperties(instanceInfo, content); + // 清理数据库后可能导致 instanceInfo 为空,进而导致 NPE + if (instanceInfo != null) { + BeanUtils.copyProperties(instanceInfo, content); + } List userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); AlarmCenter.alarmFailed(content, userList); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index aafebb7d..adf666b4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -2,10 +2,7 @@ package com.github.kfcfans.powerjob.server.service.instance; import akka.actor.ActorSelection; import akka.pattern.Patterns; -import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.PowerJobException; -import com.github.kfcfans.powerjob.common.RemoteConstant; -import com.github.kfcfans.powerjob.common.SystemInstanceResult; +import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; @@ -14,6 +11,7 @@ import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.constans.InstanceType; import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer; +import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; @@ -28,8 +26,10 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.Duration; import java.util.Date; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.github.kfcfans.powerjob.common.InstanceStatus.RUNNING; import static com.github.kfcfans.powerjob.common.InstanceStatus.STOPPED; @@ -201,16 +201,21 @@ public class InstanceService { } } + public List queryInstanceInfo(PowerQuery powerQuery) { + return instanceInfoRepository + .findAll(QueryConvertUtils.toSpecification(powerQuery)) + .stream() + .map(InstanceService::directConvert) + .collect(Collectors.toList()); + } + /** * 获取任务实例的信息 * @param instanceId 任务实例ID * @return 任务实例的信息 */ public InstanceInfoDTO getInstanceInfo(Long instanceId) { - InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId); - InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO(); - BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO); - return instanceInfoDTO; + return directConvert(fetchInstanceInfo(instanceId)); } /** @@ -276,4 +281,10 @@ public class InstanceService { } return instanceInfoDO; } + + private static InstanceInfoDTO directConvert(InstanceInfoDO instanceInfoDO) { + InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO(); + BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO); + return instanceInfoDTO; + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java index 22f3d0ee..a230d870 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.service.lock; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository; import lombok.extern.slf4j.Slf4j; @@ -24,7 +25,7 @@ public class DatabaseLockService implements LockService { private OmsLockRepository omsLockRepository; @Override - public boolean lock(String name, long maxLockTime) { + public boolean tryLock(String name, long maxLockTime) { OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime); try { @@ -43,7 +44,7 @@ public class DatabaseLockService implements LockService { log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO); unlock(name); - return lock(name, maxLockTime); + return tryLock(name, maxLockTime); } return false; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLock.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLock.java new file mode 100644 index 00000000..d550d476 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLock.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.powerjob.server.service.lock.local; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * use segment lock to make concurrent safe + * + * @author tjq + * @since 1/16/21 + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface UseSegmentLock { + + String type(); + + String key(); + + int concurrencyLevel(); +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLockAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLockAspect.java new file mode 100644 index 00000000..2ff67f1b --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLockAspect.java @@ -0,0 +1,43 @@ +package com.github.kfcfans.powerjob.server.service.lock.local; + +import com.github.kfcfans.powerjob.common.utils.SegmentLock; +import com.github.kfcfans.powerjob.server.common.utils.AOPUtils; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * aspect for @UseSegmentLock + * + * @author tjq + * @since 1/16/21 + */ +@Slf4j +@Aspect +@Component +public class UseSegmentLockAspect { + + private final Map lockStore = Maps.newConcurrentMap(); + + @Around(value = "@annotation(useSegmentLock))") + public Object execute(ProceedingJoinPoint point, UseSegmentLock useSegmentLock) throws Throwable { + SegmentLock segmentLock = lockStore.computeIfAbsent(useSegmentLock.type(), ignore -> { + int concurrencyLevel = useSegmentLock.concurrencyLevel(); + log.info("[UseSegmentLockAspect] create SegmentLock for [{}] with concurrencyLevel: {}", useSegmentLock.type(), concurrencyLevel); + return new SegmentLock(concurrencyLevel); + }); + + int index = AOPUtils.parseSpEl(AOPUtils.parseMethod(point), point.getArgs(), useSegmentLock.key(), Integer.class, 1); + try { + segmentLock.lockInterruptibleSafe(index); + return point.proceed(); + } finally { + segmentLock.unlock(index); + } + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index e8eaf305..60bbcdfb 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -7,7 +7,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; -import com.github.kfcfans.powerjob.server.service.lock.LockService; +import com.github.kfcfans.powerjob.server.extension.LockService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; @@ -78,7 +78,7 @@ public class CleanService { */ private void cleanByOneServer() { // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 - boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); + boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); if (!lock) { log.info("[CleanService] clean job is already running, just return."); return; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index 8ba71733..4940006a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -96,7 +96,7 @@ public class InstanceStatusCheckService { long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; List waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { - log.warn("[InstanceStatusChecker] instances({}) is not triggered as expected.", waitingDispatchInstances); + log.warn("[InstanceStatusChecker] find some instance which is not triggered as expected: {}", waitingDispatchInstances); waitingDispatchInstances.forEach(instance -> { // 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例 @@ -105,9 +105,13 @@ public class InstanceStatusCheckService { return; } - // 重新派发(orElseGet用于消除编译器警告...) - JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); - dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0); + Optional jobInfoOpt = jobInfoRepository.findById(instance.getJobId()); + if (jobInfoOpt.isPresent()) { + dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId(), 0); + } else { + log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance); + updateFailedInstance(instance, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO); + } }); } @@ -136,7 +140,7 @@ public class InstanceStatusCheckService { // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { - updateFailedInstance(instance); + updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); return; } @@ -144,7 +148,7 @@ public class InstanceStatusCheckService { if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) { dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); }else { - updateFailedInstance(instance); + updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT); } }); @@ -182,14 +186,14 @@ public class InstanceStatusCheckService { /** * 处理上报超时而失败的任务实例 */ - private void updateFailedInstance(InstanceInfoDO instance) { + private void updateFailedInstance(InstanceInfoDO instance, String result) { - log.warn("[InstanceStatusCheckService] detected instance(instanceId={},jobId={})'s TaskTracker report timeout,this instance is considered a failure.", instance.getInstanceId(), instance.getJobId()); + log.warn("[InstanceStatusChecker] instance[{}] failed due to {}, instanceInfo: {}", instance.getInstanceId(), result, instance); instance.setStatus(InstanceStatus.FAILED.getV()); instance.setFinishedTime(System.currentTimeMillis()); instance.setGmtModified(new Date()); - instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); + instance.setResult(result); instanceInfoRepository.saveAndFlush(instance); instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index a9bf3968..53ea13ce 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -2,7 +2,9 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.OpenAPIConstant; +import com.github.kfcfans.powerjob.common.PowerQuery; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.powerjob.common.request.query.JobInfoQuery; import com.github.kfcfans.powerjob.server.service.AppInfoService; import com.github.kfcfans.powerjob.server.service.CacheService; import com.github.kfcfans.powerjob.server.service.JobService; @@ -14,6 +16,7 @@ import com.github.kfcfans.powerjob.common.response.*; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import java.util.List; /** * 开放接口(OpenAPI)控制器,对接 oms-client @@ -60,6 +63,16 @@ public class OpenAPIController { return ResultDTO.success(jobService.fetchJob(jobId)); } + @PostMapping(OpenAPIConstant.FETCH_ALL_JOB) + public ResultDTO> fetchAllJob(Long appId) { + return ResultDTO.success(jobService.fetchAllJob(appId)); + } + + @PostMapping(OpenAPIConstant.QUERY_JOB) + public ResultDTO> queryJob(@RequestBody JobInfoQuery powerQuery) { + return ResultDTO.success(jobService.queryJob(powerQuery)); + } + @PostMapping(OpenAPIConstant.DELETE_JOB) public ResultDTO deleteJob(Long jobId, Long appId) { checkJobIdValid(jobId, appId); @@ -119,6 +132,11 @@ public class OpenAPIController { return ResultDTO.success(instanceService.getInstanceInfo(instanceId)); } + @PostMapping(OpenAPIConstant.QUERY_INSTANCE) + public ResultDTO> queryInstance(@RequestBody PowerQuery powerQuery) { + return ResultDTO.success(instanceService.queryInstanceInfo(powerQuery)); + } + /* ************* Workflow 区 ************* */ @PostMapping(OpenAPIConstant.SAVE_WORKFLOW) public ResultDTO saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception { diff --git a/powerjob-server/src/main/resources/banner.txt b/powerjob-server/src/main/resources/banner.txt index 6e896f51..b19a0c68 100644 --- a/powerjob-server/src/main/resources/banner.txt +++ b/powerjob-server/src/main/resources/banner.txt @@ -8,7 +8,7 @@ ${AnsiColor.GREEN} ░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ${AnsiColor.BRIGHT_RED} -* Maintainer: tengjiqi@gmail.com & PowerJob-Team +* Maintainer: tengjiqi@gmail.com & Team PowerJob * OfficialWebsite: http://www.powerjob.tech/ * SourceCode: https://github.com/PowerJob/PowerJob * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) diff --git a/powerjob-server/src/main/resources/static/js/7.js b/powerjob-server/src/main/resources/static/js/7.js index 1e490d42..3fa247e7 100644 --- a/powerjob-server/src/main/resources/static/js/7.js +++ b/powerjob-server/src/main/resources/static/js/7.js @@ -8,7 +8,7 @@ /***/ (function(module, __webpack_exports__, __webpack_require__) { "use strict"; -eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var core_js_modules_es_array_for_each__WEBPACK_IMPORTED_MODULE_0__ = __webpack_require__(/*! core-js/modules/es.array.for-each */ \"./node_modules/core-js/modules/es.array.for-each.js\");\n/* harmony import */ var core_js_modules_es_array_for_each__WEBPACK_IMPORTED_MODULE_0___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_array_for_each__WEBPACK_IMPORTED_MODULE_0__);\n/* harmony import */ var core_js_modules_web_dom_collections_for_each__WEBPACK_IMPORTED_MODULE_1__ = __webpack_require__(/*! core-js/modules/web.dom-collections.for-each */ \"./node_modules/core-js/modules/web.dom-collections.for-each.js\");\n/* harmony import */ var core_js_modules_web_dom_collections_for_each__WEBPACK_IMPORTED_MODULE_1___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_web_dom_collections_for_each__WEBPACK_IMPORTED_MODULE_1__);\n\n\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Welcome\",\n data: function data() {\n return {\n // 应用注册表单是否可见\n appRegisterFormVisible: false,\n // 用户注册表单是否可见\n userRegisterFormVisible: false,\n // 应用注册表单对象\n appRegisterForm: {\n appName: \"\",\n password: undefined\n },\n // 用户注册表单对象\n userRegisterForm: {\n username: \"\",\n phone: \"\",\n email: \"\",\n webHook: \"\"\n },\n // 控制台登陆对象\n appLoginForm: {\n appName: undefined,\n password: undefined\n },\n // 是否保持登录状态\n stayLogged: true\n };\n },\n methods: {\n // 请求应用下拉框数据\n queryAppNames: function queryAppNames(queryString, cb) {\n var array = [];\n var that = this;\n var url = \"/appInfo/list?condition=\" + queryString;\n this.axios.get(url).then(function (result) {\n result.forEach(function (appInfo) {\n array.push({\n \"value\": appInfo.appName\n });\n cb(array);\n });\n }, function (error) {\n return that.$message.error(error);\n });\n clearTimeout(this.timeout);\n this.timeout = setTimeout(function () {\n cb(array);\n }, 3000);\n },\n // 注册应用\n registerApp: function registerApp() {\n var _this = this;\n\n var that = this;\n this.axios.post(\"/appInfo/save\", this.appRegisterForm).then(function () {\n that.$message.success(_this.$t('message.success'));\n that.appRegisterFormVisible = false;\n }, that.appRegisterFormVisible = false);\n },\n // 注册用户(仅用于报警通知)\n registerUser: function registerUser() {\n var _this2 = this;\n\n var that = this;\n this.axios.post(\"/user/save\", this.userRegisterForm).then(function () {\n that.$message.success(_this2.$t('message.success'));\n that.userRegisterFormVisible = false;\n }, that.userRegisterFormVisible = false);\n },\n // 登陆控制台\n login: function login() {\n var _this3 = this;\n\n var that = this;\n this.axios.post(\"/appInfo/assert\", this.appLoginForm).then(function (res) {\n // 勾选了保持登录状态,就开启自动登录,直接本地存用户名密码(内部系统浏览器明文存问题不大)\n if (_this3.stayLogged) {\n window.localStorage.setItem('oms_auto_login', JSON.stringify(_this3.appLoginForm));\n }\n\n var appInfo = {\n id: res,\n appName: that.appLoginForm.appName\n }; // 将 appId 存储到 VueStore\n\n _this3.$store.commit(\"initAppInfo\", appInfo); // 跳转到主界面\n\n\n _this3.$router.push(\"/oms/home\");\n }, function (error) {\n window.localStorage.removeItem('oms_auto_login');\n that.$message.error(error);\n });\n },\n // 自动登录\n autoLogin: function autoLogin() {\n var autoLoginString = window.localStorage.getItem(\"oms_auto_login\");\n\n if (autoLoginString === undefined || autoLoginString === null) {\n return;\n }\n\n this.appLoginForm = JSON.parse(autoLoginString);\n this.login();\n }\n },\n mounted: function mounted() {\n // 加载默认语言配置文件\n var localLang = window.localStorage.getItem('oms_lang');\n console.log(\"language from localStorage is %o\", localLang);\n\n if (localLang != null) {\n this.$i18n.locale = localLang;\n } else {\n var lang = navigator.language;\n console.log(\"language from system is %o\", lang);\n\n switch (lang) {\n case \"zh-HK\":\n case \"zh-TW\":\n case \"zh-SG\":\n case \"zh-CN\":\n this.$i18n.locale = \"cn\";\n break;\n\n default:\n this.$i18n.locale = \"en\";\n }\n } // 自动登录\n\n\n this.autoLogin();\n }\n});\n\n//# sourceURL=webpack:///./src/components/Welcome.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); +eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var core_js_modules_es_array_for_each__WEBPACK_IMPORTED_MODULE_0__ = __webpack_require__(/*! core-js/modules/es.array.for-each */ \"./node_modules/core-js/modules/es.array.for-each.js\");\n/* harmony import */ var core_js_modules_es_array_for_each__WEBPACK_IMPORTED_MODULE_0___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_array_for_each__WEBPACK_IMPORTED_MODULE_0__);\n/* harmony import */ var core_js_modules_web_dom_collections_for_each__WEBPACK_IMPORTED_MODULE_1__ = __webpack_require__(/*! core-js/modules/web.dom-collections.for-each */ \"./node_modules/core-js/modules/web.dom-collections.for-each.js\");\n/* harmony import */ var core_js_modules_web_dom_collections_for_each__WEBPACK_IMPORTED_MODULE_1___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_web_dom_collections_for_each__WEBPACK_IMPORTED_MODULE_1__);\n\n\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Welcome\",\n data: function data() {\n return {\n // 应用注册表单是否可见\n appRegisterFormVisible: false,\n // 用户注册表单是否可见\n userRegisterFormVisible: false,\n // 应用注册表单对象\n appRegisterForm: {\n appName: \"\",\n password: undefined\n },\n // 用户注册表单对象\n userRegisterForm: {\n username: \"\",\n phone: \"\",\n email: \"\",\n webHook: \"\"\n },\n // 控制台登陆对象\n appLoginForm: {\n appName: undefined,\n password: undefined\n },\n // 是否保持登录状态\n stayLogged: true\n };\n },\n methods: {\n // 请求应用下拉框数据\n queryAppNames: function queryAppNames(queryString, cb) {\n var array = [];\n var that = this;\n var url = \"/appInfo/list?condition=\" + queryString;\n this.axios.get(url).then(function (result) {\n result.forEach(function (appInfo) {\n array.push({\n \"value\": appInfo.appName\n });\n cb(array);\n });\n }, function (error) {\n return that.$message.error(error);\n });\n clearTimeout(this.timeout);\n this.timeout = setTimeout(function () {\n cb(array);\n }, 3000);\n },\n // 注册应用\n registerApp: function registerApp() {\n var _this = this;\n\n var that = this;\n this.axios.post(\"/appInfo/save\", this.appRegisterForm).then(function () {\n that.$message.success(_this.$t('message.success'));\n that.appRegisterFormVisible = false;\n }, that.appRegisterFormVisible = false);\n },\n // 注册用户(仅用于报警通知)\n registerUser: function registerUser() {\n var _this2 = this;\n\n var that = this;\n this.axios.post(\"/user/save\", this.userRegisterForm).then(function () {\n that.$message.success(_this2.$t('message.success'));\n that.userRegisterFormVisible = false;\n }, that.userRegisterFormVisible = false);\n },\n // 登陆控制台\n login: function login() {\n var _this3 = this;\n\n var that = this;\n this.axios.post(\"/appInfo/assert\", this.appLoginForm).then(function (res) {\n // 勾选了保持登录状态,就开启自动登录,直接本地存用户名密码(内部系统浏览器明文存问题不大)\n if (_this3.stayLogged) {\n window.localStorage.setItem('oms_auto_login', JSON.stringify(_this3.appLoginForm));\n }\n\n var appInfo = {\n id: res,\n appName: that.appLoginForm.appName\n }; // 将 appId 存储到 VueStore\n\n _this3.$store.commit(\"initAppInfo\", appInfo); // 跳转到主界面\n\n\n _this3.$router.push(\"/oms/home\");\n }, function (error) {\n window.localStorage.removeItem('oms_auto_login');\n that.$message.error(error);\n });\n },\n // 自动登录\n autoLogin: function autoLogin() {\n var autoLoginString = window.localStorage.getItem(\"oms_auto_login\");\n\n if (autoLoginString === undefined || autoLoginString === null) {\n return;\n }\n\n this.appLoginForm = JSON.parse(autoLoginString);\n this.login();\n },\n // 通过 URL 自动登陆\n loginByUrlParams: function loginByUrlParams() {\n var appName = this.$route.query.appName;\n var password = this.$route.query.password;\n console.log(\"login params from url: %o, %o\", appName, password);\n\n if (appName === undefined || appName === null || appName === '') {\n return;\n }\n\n this.appLoginForm.appName = appName;\n this.appLoginForm.password = password;\n this.login();\n }\n },\n mounted: function mounted() {\n // 加载默认语言配置文件\n var localLang = window.localStorage.getItem('oms_lang');\n console.log(\"language from localStorage is %o\", localLang);\n\n if (localLang != null) {\n this.$i18n.locale = localLang;\n } else {\n var lang = navigator.language;\n console.log(\"language from system is %o\", lang);\n\n switch (lang) {\n case \"zh-HK\":\n case \"zh-TW\":\n case \"zh-SG\":\n case \"zh-CN\":\n this.$i18n.locale = \"cn\";\n break;\n\n default:\n this.$i18n.locale = \"en\";\n }\n } // 根据 URL 自动登陆\n\n\n this.loginByUrlParams(); // 根据历史记录自动登陆\n\n this.autoLogin();\n }\n});\n\n//# sourceURL=webpack:///./src/components/Welcome.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); /***/ }), diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/common/utils/QueryConvertUtilsTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/common/utils/QueryConvertUtilsTest.java new file mode 100644 index 00000000..d34595fb --- /dev/null +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/common/utils/QueryConvertUtilsTest.java @@ -0,0 +1,59 @@ +package com.github.kfcfans.powerjob.server.common.utils; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.common.PowerQuery; +import com.github.kfcfans.powerjob.common.response.JobInfoDTO; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.service.JobService; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.time.DateUtils; +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; + +import java.util.Date; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * test QueryConvertUtils + * + * @author tjq + * @since 2021/1/16 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class QueryConvertUtilsTest { + + @Resource + private JobService jobService; + + @Test + void autoConvert() { + JobInfoQuery jobInfoQuery = new JobInfoQuery(); + jobInfoQuery.setAppIdEq(1L); + jobInfoQuery.setJobNameLike("DAG"); + jobInfoQuery.setStatusIn(Lists.newArrayList(1)); + jobInfoQuery.setGmtCreateGt(DateUtils.addDays(new Date(), -300)); + + List list = jobService.queryJob(jobInfoQuery); + System.out.println("size: " + list.size()); + System.out.println(JSONObject.toJSONString(list)); + } + + @Getter + @Setter + public static class JobInfoQuery extends PowerQuery { + private String jobNameLike; + private Date gmtCreateGt; + private List statusIn; + } +} \ No newline at end of file diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/ServiceTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/ServiceTest.java index e7539a85..66e1a243 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/ServiceTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/ServiceTest.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.test; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; -import com.github.kfcfans.powerjob.server.service.lock.LockService; +import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.service.timing.CleanService; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,8 +32,8 @@ public class ServiceTest { public void testLockService() { String lockName = "myLock"; - lockService.lock(lockName, 10000); - lockService.lock(lockName, 10000); + lockService.tryLock(lockName, 10000); + lockService.tryLock(lockName, 10000); lockService.unlock(lockName); } diff --git a/powerjob-server/src/test/resources/application.properties b/powerjob-server/src/test/resources/application.properties index 4cbbeca3..ef264fe6 100644 --- a/powerjob-server/src/test/resources/application.properties +++ b/powerjob-server/src/test/resources/application.properties @@ -14,14 +14,15 @@ spring.servlet.multipart.max-request-size=209715200 ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/oms-daily?useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,可移除 ####### -spring.data.mongodb.uri=mongodb://remotehost:27017/oms-daily +oms.mongodb.enable=true +spring.data.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority ###### OhMyScheduler 自身配置(该配置只允许存在于 application.properties 文件中) ###### # akka ActorSystem 服务端口 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index aa2c813c..b108e4c3 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.4.3 + 3.4.4 jar - 3.4.3 + 3.4.4 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 1f3b70e6..ee816b24 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.4.3 + 3.4.4 2.2.6.RELEASE - 3.4.3 + 3.4.4 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index ea70054f..1e0d7a92 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.4.3 + 3.4.4 jar - 3.4.3 + 3.4.4 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 9667fbbb..536f28db 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.4.3 + 3.4.4 jar 5.2.4.RELEASE - 3.4.3 + 3.4.4 1.4.200 3.4.2 5.6.1 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index f4909ae3..c7ed24aa 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -198,8 +198,8 @@ public class ProcessorTracker { int poolSize = calThreadPoolSize(); // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 BlockingQueue queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); - // 自定义线程池中线程名称 - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); + // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP) + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build(); // 拒绝策略:直接抛出异常 RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); @@ -214,8 +214,8 @@ public class ProcessorTracker { */ private void initTimingJob() { - // 全称 oms-ProcessTracker-TimingPool - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-ProcessorTrackerTimingPool-%d").build(); + // PowerJob Processor TimingPool + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build(); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS); @@ -340,13 +340,17 @@ public class ProcessorTracker { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); - if (executeType == ExecuteType.MAP_REDUCE) { - return instanceInfo.getThreadConcurrency(); - } // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { return 1; } + + if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { + return instanceInfo.getThreadConcurrency(); + } + if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { + return instanceInfo.getThreadConcurrency(); + } return 2; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index d5e05d0f..7a928c6e 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -57,7 +57,8 @@ public class CommonTaskTracker extends TaskTracker { persistenceRootTask(); // 开启定时状态检查 - scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 13, 13, TimeUnit.SECONDS); + int delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "13")); + scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3, delay, TimeUnit.SECONDS); // 如果是 MR 任务,则需要启动执行器动态检测装置 ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType()); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index a50029b3..4cef1603 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -173,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker { // 判断是否超出最大执行实例数 if (maxInstanceNum > 0) { if (timeExpressionType == TimeExpressionType.FIXED_RATE) { - if (subInstanceId2TimeHolder.size() > maxInstanceNum) { + if (subInstanceId2TimeHolder.size() >= maxInstanceNum) { log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); return;