diff --git a/README.md b/README.md index 5800098a..f0115858 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +English | [简体中文](./README_zhCN.md) +

PowerJob

@@ -9,68 +11,75 @@ LICENSE

-PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。 -# 简介 -### 主要特性 -* 使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。 -* 定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。 -* 执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。 -* DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递 -* 执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。 -* 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。 -* 依赖精简:最小仅依赖关系型数据库(MySQL/Oracle/MS SQLServer...),扩展依赖为MongoDB(用于存储庞大的在线日志)。 -* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。 -* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。 +- Have you ever wondered how cron jobs could be organized orderly? +- Have you ever felt upset when scheduling tasks suddenly terminated without any warning? +- Have you ever felt helpless when batches of business tasks require handling? +- Have you ever felt depressed about tasks that carry with complex dependencies? -### 适用场景 -* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。 -* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。 -* 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。 -* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。 +Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you! -### 设计目标 -PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 +# Introduction -### 在线试用 -试用地址:[try.powerjob.tech](http://try.powerjob.tech/) -试用应用名称:powerjob-agent-test -控制台密码:123 +### Features +- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online. +- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API. +- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability. +- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow. +- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on. +- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online. +- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need. +- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally. +- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly. -[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) +### Applicable scenes -### 同类产品对比 -| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | -| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ | -| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** | -| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | **内置Java、外置Java(容器)、Shell、Python等脚本** | -| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** | -| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** | -| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | -| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | -| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** | -| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | -| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | +- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time. +- Scenarios that require all machines to run tasks simultaneously: such as log cleanup. +- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster. +- Scenarios with delayed tasks: For instance, disposal of overdue orders. +### Design goals -# 官方文档 -**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** +PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center, +all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker. -**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** +### Online trial -PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! +Trial address: [Online Trial Address](http://try.powerjob.tech/) +Application name: powerjob-agent-test +Application password: 123 -# 接入登记 -[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6) +### Comparison with similar products -ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ +| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | +| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | +| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** | +| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** | +| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** | +| Online task management | Unsupported | Supported | Supported | **Supported** | +| Online logging | Unsupported | Supported | Unsupported | **Supported** | +| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** | +| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** | +| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | +| workflow | Unsupported | Unsupported | Supported | **Supported** | + +# Document +**[Docs](https://www.yuque.com/powerjob/en/introduce)** -

+**[中文文档](https://www.yuque.com/powerjob/product)** + +# User Registration +[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6) +ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ +

PowerJob User

-# 其他 -* 开源许可证:Apache License, Version 2.0 -* 欢迎共同参与本项目的贡献,PR和Issue都大大滴欢迎(求求了)~ -* 觉得还不错的话,可以点个Star支持一下哦~ = ̄ω ̄= -* 联系方式@KFCFans -> `tengjiqi@gmail.com` -* 用户交流QQ群:487453839 \ No newline at end of file + +# Others + +- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production! +- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs! +- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious. +- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ = +- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly. \ No newline at end of file diff --git a/README_enUS.md b/README_enUS.md deleted file mode 100644 index 3cbcad04..00000000 --- a/README_enUS.md +++ /dev/null @@ -1,83 +0,0 @@ -

-PowerJob -

- -

-actions -Maven Central -GitHub release (latest SemVer) -LICENSE -

- -- Have you ever wondered how cron jobs could be organized orderly? -- Have you ever felt upset when scheduling tasks suddenly terminated without any warning? -- Have you ever felt helpless when batches of business tasks require handling? -- Have you ever felt depressed about tasks that carry with complex dependencies? - -Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you! - -# Introduction - -### Features -- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online. -- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API. -- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability. -- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow. -- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on. -- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online. -- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need. -- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally. -- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly. - -### Applicable scenes - -- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time. -- Scenarios that require all machines to run tasks simultaneously: such as log cleanup. -- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster. -- Scenarios with delayed tasks: For instance, disposal of overdue orders. - -### Design goals - -PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center, -all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker. - -### Online trial - -Trial address: [Online Trial Address](http://try.powerjob.tech/) -Application name: powerjob-agent-test -Application password: 123 - -### Comparison with similar products - -| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | -| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** | -| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** | -| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** | -| Online task management | Unsupported | Supported | Supported | **Supported** | -| Online logging | Unsupported | Supported | Unsupported | **Supported** | -| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** | -| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** | -| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | -| workflow | Unsupported | Unsupported | Supported | **Supported** | - -# Document -**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)** - -**[中文文档](https://www.yuque.com/powerjob/product)** - -# User Registration -[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6) -ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ -

-PowerJob User -

- - -# Others - -- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production! -- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs! -- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious. -- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ = -- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly. \ No newline at end of file diff --git a/README_zhCN.md b/README_zhCN.md new file mode 100644 index 00000000..1482418b --- /dev/null +++ b/README_zhCN.md @@ -0,0 +1,78 @@ +[English](./README.md) | 简体中文 + +

+PowerJob +

+ +

+actions +Maven Central +GitHub release (latest SemVer) +LICENSE +

+ +PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。 +# 简介 +### 主要特性 +* 使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。 +* 定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。 +* 执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。 +* DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递 +* 执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。 +* 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。 +* 依赖精简:最小仅依赖关系型数据库(MySQL/Oracle/MS SQLServer...),扩展依赖为MongoDB(用于存储庞大的在线日志)。 +* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。 +* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。 + +### 适用场景 +* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。 +* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。 +* 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。 +* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。 + +### 设计目标 +PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 + +### 在线试用 +试用地址:[try.powerjob.tech](http://try.powerjob.tech/) +试用应用名称:powerjob-agent-test +控制台密码:123 + +[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) + +### 同类产品对比 +| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | +| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ | +| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** | +| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | **内置Java、外置Java(容器)、Shell、Python等脚本** | +| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** | +| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** | +| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | +| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | +| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** | +| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | +| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | + + +# 官方文档 +**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** + +**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** + +PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! + +# 接入登记 +[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6) + +ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ + +

+PowerJob User +

+ +# 其他 +* 开源许可证:Apache License, Version 2.0 +* 欢迎共同参与本项目的贡献,PR和Issue都大大滴欢迎(求求了)~ +* 觉得还不错的话,可以点个Star支持一下哦~ = ̄ω ̄= +* 联系方式@KFCFans -> `tengjiqi@gmail.com` +* 用户交流QQ群:487453839 \ No newline at end of file diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index d49a6a17..8a8a6621 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-client - 3.4.1 + 3.4.2 jar 5.6.1 1.2.68 - 3.4.1 + 3.4.2 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index e92f4a92..b1e2d8e3 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.4.1 + 3.4.2 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java new file mode 100644 index 00000000..58c1c920 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java @@ -0,0 +1,34 @@ +package com.github.kfcfans.powerjob.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + +/** + * 日志级别 + * + * @author tjq + * @since 12/20/20 + */ +@Getter +@AllArgsConstructor +public enum LogLevel { + + DEBUG(1), + INFO(2), + WARN(3), + ERROR(4); + + private final int v; + + public static String genLogLevelString(Integer v) { + + for (LogLevel logLevel : values()) { + if (Objects.equals(logLevel.v, v)) { + return logLevel.name(); + } + } + return "UNKNOWN"; + } +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java index a22db900..0be65e93 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java @@ -20,6 +20,8 @@ public class InstanceLogContent implements OmsSerializable { private long instanceId; // 日志提交时间 private long logTime; + // 级别 + private int logLevel; // 日志内容 private String logContent; } diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 80a62d2e..83885a05 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.4.1 + 3.4.2 jar 2.9.2 2.3.4.RELEASE - 3.4.1 + 3.4.2 8.0.19 19.7.0.0 @@ -214,6 +214,7 @@ + build-info repackage diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index 804e45ce..68878f1e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -1,5 +1,8 @@ package com.github.kfcfans.powerjob.server.common.config; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; @@ -11,26 +14,40 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; import static springfox.documentation.builders.PathSelectors.any; /** - * Swagger UI 配置 + * Configuration class for Swagger UI. * * @author tjq + * @author Jiang Jining * @since 2020/3/29 */ @Configuration @EnableSwagger2 public class SwaggerConfig { - + + private final BuildProperties buildProperties; + + public SwaggerConfig(@Autowired(required = false) final BuildProperties buildProperties) { + this.buildProperties = buildProperties; + } + @Bean public Docket createRestApi() { + String version = "unknown"; + if (buildProperties != null) { + String pomVersion = buildProperties.getVersion(); + if (StringUtils.isNotBlank(pomVersion)) { + version = pomVersion; + } + } // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 ApiInfo apiInfo = new ApiInfoBuilder() .title("PowerJob") .description("Distributed scheduling and computing framework.") .license("Apache Licence 2") .termsOfServiceUrl("https://github.com/KFCFans/PowerJob") - .version("3.3.3") + .version(version) .build(); - + return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo) // select()函数返回一个ApiSelectorBuilder实例 @@ -39,5 +56,5 @@ public class SwaggerConfig { .paths(any()) .build(); } - + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 5762f0ce..6ec3f08f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -28,8 +28,8 @@ public class ThreadPoolConfig { @Bean("omsTimingPool") public Executor getTimingPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 16); - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 32); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); // use SynchronousQueue executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java index bc31aafd..7954b7e6 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java @@ -6,7 +6,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * 执行服务器运行 + * 需要在指定的服务器运行 * 注意:该注解所在方法的参数必须为对象,不可以是 long 等基本类型 * * @author tjq diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java index 87a47448..9d8684b3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java @@ -62,7 +62,7 @@ public class DesignateServerAspect { } if (appId == null) { - throw new PowerJobException("can't find appId in params!"); + throw new PowerJobException("can't find appId in params for:" + signature.toString()); } // 获取执行机器 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index a956830c..5af69945 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -5,7 +5,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -37,7 +36,6 @@ public class InstanceInfoDO { // 任务实例参数 @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String instanceParams; // 该任务实例的类型,普通/工作流(InstanceType) @@ -51,7 +49,6 @@ public class InstanceInfoDO { // 执行结果(允许存储稍大的结果) @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String result; // 预计触发时间 private Long expectedTriggerTime; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index ddccbf7a..b5d2b171 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -5,7 +5,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -53,7 +52,6 @@ public class JobInfoDO { // 执行器信息(可能需要存储整个脚本文件) @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String processorInfo; /* ************************** 运行时配置 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java deleted file mode 100644 index fbe4b199..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.github.kfcfans.powerjob.server.persistence.core.model; - -/** - * @see package-info.java - * @author user - */ -public final class TypeDefConstant { - public static final String STRING_TYPE = "string-type"; -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java index ee8c53a8..f3c5d560 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java @@ -4,7 +4,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -36,7 +35,6 @@ public class WorkflowInfoDO { // 工作流的DAG图信息(点线式DAG的json) @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String peDAG; /* ************************** 定时参数 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index d6290205..e6f483c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -4,7 +4,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -40,16 +39,13 @@ public class WorkflowInstanceInfoDO { // 工作流启动参数 @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String wfInitParams; @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String dag; @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String result; // 预计触发时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java deleted file mode 100644 index 543fd8a0..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java +++ /dev/null @@ -1,7 +0,0 @@ -@TypeDefs({ - @TypeDef(name = TypeDefConstant.STRING_TYPE, typeClass = org.hibernate.type.StringType.class) -}) -package com.github.kfcfans.powerjob.server.persistence.core.model; - -import org.hibernate.annotations.TypeDef; -import org.hibernate.annotations.TypeDefs; \ No newline at end of file diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java index 040f8b2b..e9e5028a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java @@ -28,6 +28,10 @@ public class LocalInstanceLogDO { * 日志时间 */ private Long logTime; + /** + * 日志级别 {@link com.github.kfcfans.powerjob.common.LogLevel} + */ + private Integer logLevel; /** * 日志内容 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 3e5c41f4..6399a72f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -81,7 +81,7 @@ public class DispatchService { if (maxInstanceNum > 0) { // 这个 runningInstanceCount 已经包含了本 instance - // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务显然不应该统计进去(比如 delay 是 1 天) + // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天) long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); // 超出最大同时运行限制,不执行调度 if (runningInstanceCount > maxInstanceNum) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index b197d31e..18fde5c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.service; +import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.InstanceLogContent; @@ -320,12 +321,16 @@ public class InstanceLogService { /** - * 拼接日志 -> 2020-04-29 22:07:10.059 192.168.1.1:2777 INFO XXX + * 拼接日志 -> 2020-04-29 22:07:10.059 [192.168.1.1:2777] INFO XXX * @param instanceLog 日志对象 * @return 字符串 */ private static String convertLog(LocalInstanceLogDO instanceLog) { - return String.format("%s [%s] -%s", dateFormat.format(instanceLog.getLogTime()), instanceLog.getWorkerAddress(), instanceLog.getLogContent()); + return String.format("%s [%s] %s %s", + dateFormat.format(instanceLog.getLogTime()), + instanceLog.getWorkerAddress(), + LogLevel.genLogLevelString(instanceLog.getLogLevel()), + instanceLog.getLogContent()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java index 19708f67..22f3d0ee 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java @@ -4,14 +4,11 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository; -import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * 基于数据库实现的分布式锁 @@ -26,35 +23,27 @@ public class DatabaseLockService implements LockService { @Resource private OmsLockRepository omsLockRepository; - private Map lockName2FailedTimes = Maps.newConcurrentMap(); - private static final int MAX_FAILED_NUM = 5; - @Override public boolean lock(String name, long maxLockTime) { - AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0)); OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime); try { omsLockRepository.saveAndFlush(newLock); - failedCount.set(0); return true; - }catch (DataIntegrityViolationException ignore) { - }catch (Exception e) { + } catch (DataIntegrityViolationException ignore) { + } catch (Exception e) { log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e); } - // 连续失败一段时间,需要判断是否为锁释放失败的情况 - if (failedCount.incrementAndGet() > MAX_FAILED_NUM) { + OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); + long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); - OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); - long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); - if (lockedMillions > omsLockDO.getMaxLockTime()) { + // 锁超时,强制释放锁并重新尝试获取 + if (lockedMillions > omsLockDO.getMaxLockTime()) { - log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO); - unlock(name); - } else { - failedCount.set(0); - } + log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO); + unlock(name); + return lock(name, maxLockTime); } return false; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index f180ce3d..e8eaf305 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; +import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; @@ -36,6 +37,8 @@ public class CleanService { private InstanceInfoRepository instanceInfoRepository; @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + @Resource + private LockService lockService; @Value("${oms.instanceinfo.retention}") private int instanceInfoRetentionDay; @@ -51,6 +54,8 @@ public class CleanService { // 每天凌晨3点定时清理 private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; + private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; + @Async("omsTimingPool") @Scheduled(cron = CLEAN_TIME_EXPRESSION) @@ -59,18 +64,35 @@ public class CleanService { // 释放本地缓存 WorkerManagerService.cleanUp(); - // 删除数据库运行记录 - cleanInstanceLog(); - cleanWorkflowInstanceLog(); - // 释放磁盘空间 cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); - // 删除 GridFS 过期文件 - cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); - cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); + // 删除数据库历史的数据 + cleanByOneServer(); + } + + /** + * 只能一台server清理的操作统一到这里执行 + */ + private void cleanByOneServer() { + // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 + boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); + if (!lock) { + log.info("[CleanService] clean job is already running, just return."); + return; + } + try { + // 删除数据库运行记录 + cleanInstanceLog(); + cleanWorkflowInstanceLog(); + // 删除 GridFS 过期文件 + cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); + cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); + } finally { + lockService.unlock(HISTORY_DELETE_LOCK); + } } @VisibleForTesting @@ -91,7 +113,7 @@ public class CleanService { } // 计算最大偏移量 - long maxOffset = day * 24 * 60 * 60 * 1000; + long maxOffset = day * 24 * 60 * 60 * 1000L; for (File f : logFiles) { long offset = System.currentTimeMillis() - f.lastModified(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index ddea791e..8ba71733 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -60,7 +60,7 @@ public class InstanceStatusCheckService { private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Async("omsTimingPool") - @Scheduled(fixedRate = 10000) + @Scheduled(fixedDelay = 10000) public void timingStatusCheck() { Stopwatch stopwatch = Stopwatch.createStarted(); @@ -115,7 +115,7 @@ public class InstanceStatusCheckService { threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; List waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { - log.warn("[InstanceStatusChecker] instances({}) didn't receive any reply from worker.", waitingWorkerReceiveInstances); + log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances); waitingWorkerReceiveInstances.forEach(instance -> { // 重新派发 JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 144af178..c8a4e383 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -133,7 +133,7 @@ public class OmsScheduleService { // 1. 批量写日志表 Map jobId2InstanceId = Maps.newHashMap(); - log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); + log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); jobInfos.forEach(jobInfo -> { Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index a10b8a92..adf8cc4c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -314,7 +314,7 @@ public class WorkflowInstanceManager { } /** - * 允许任务实例 + * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 * @param jobId 任务ID * @param instanceId 任务实例ID diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index ae36a78a..61d806f2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -17,6 +17,7 @@ import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest; import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO; import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Example; import org.springframework.data.domain.Page; @@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import java.io.File; +import java.net.URL; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -92,6 +94,24 @@ public class InstanceController { OmsFileUtils.file2HttpResponse(file, response); } + @GetMapping("/downloadLog4Console") + public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) throws Exception { + // 获取内部下载链接 + String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId); + // 先下载到本机 + String logFilePath = OmsFileUtils.genTemporaryWorkPath() + String.format("powerjob-%s-%s.log", appId, instanceId); + File logFile = new File(logFilePath); + + try { + FileUtils.copyURLToFile(new URL(downloadUrl), logFile); + + // 再推送到浏览器 + OmsFileUtils.file2HttpResponse(logFile, response); + } finally { + FileUtils.forceDelete(logFile); + } + } + @PostMapping("/list") public ResultDTO> list(@RequestBody QueryInstanceRequest request) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index 7ee1cc58..a9bf3968 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -122,9 +122,6 @@ public class OpenAPIController { /* ************* Workflow 区 ************* */ @PostMapping(OpenAPIConstant.SAVE_WORKFLOW) public ResultDTO saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception { - if (request.getId() != null) { - checkJobIdValid(request.getId(), request.getAppId()); - } return ResultDTO.success(workflowService.saveWorkflow(request)); } diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 817bffdb..94ea9386 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.4.1 + 3.4.2 jar - 3.4.1 + 3.4.2 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index cdacbb5e..6db5e26f 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.4.1 + 3.4.2 2.2.6.RELEASE - 3.4.1 + 3.4.2 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index dfddbb77..8a6f7749 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.4.1 + 3.4.2 jar - 3.4.1 + 3.4.2 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 9a73566b..e3dc356d 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.4.1 + 3.4.2 jar 5.2.4.RELEASE - 3.4.1 + 3.4.2 1.4.200 3.4.2 5.6.1 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java index 0ad1261e..40aed030 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.worker.background; import akka.actor.ActorSelection; +import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.model.InstanceLogContent; import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; @@ -48,14 +49,14 @@ public class OmsLogHandler { * @param instanceId 任务实例ID * @param logContent 日志内容 */ - public void submitLog(long instanceId, String logContent) { + public void submitLog(long instanceId, LogLevel logLevel, String logContent) { if (logQueue.size() > REPORT_SIZE) { // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁 new Thread(logSubmitter).start(); } - InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent); + InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent); logQueue.offer(tuple); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java index b8cafad3..c495dbda 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java @@ -1,15 +1,15 @@ package com.github.kfcfans.powerjob.worker.common.utils; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; -import java.util.Objects; - /** * Spring ApplicationContext 工具类 * * @author tjq * @since 2020/3/16 */ +@Slf4j public class SpringUtils { private static boolean supportSpringBean = false; @@ -43,7 +43,9 @@ public class SpringUtils { // 小写转大写 char[] cs = beanName.toCharArray(); cs[0] += 32; - return (T) context.getBean(String.valueOf(cs)); + String beanName0 = String.valueOf(cs); + log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", context, beanName0); + return (T) context.getBean(beanName0); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java index 2b890d90..27dfbf32 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.Map; @@ -37,7 +38,8 @@ public class ProcessorBeanFactory { return (BasicProcessor) clz.getDeclaredConstructor().newInstance(); }catch (Exception e) { - log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed, reason is {}", className, e.getMessage()); + log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e); + ExceptionUtils.rethrow(e); } return null; }); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 4c7e42e2..15876cb6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -24,6 +24,7 @@ import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.util.CollectionUtils; import java.util.List; @@ -54,8 +55,10 @@ public class ProcessorTracker { private OmsLogger omsLogger; // ProcessResult 上报失败的重试队列 private Queue statusReportRetryQueue; - // 上一次空闲时间 + // 上一次空闲时间(用于闲置判定) private long lastIdleTime; + // 上次完成任务数量(用于闲置判定) + private long lastCompletedTaskCount; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -87,6 +90,7 @@ public class ProcessorTracker { this.omsLogger = new OmsServerLogger(instanceId); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; + this.lastCompletedTaskCount = 0L; // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); @@ -96,10 +100,10 @@ public class ProcessorTracker { initProcessor(); log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId); - }catch (Throwable e) { - log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e); + } catch (Throwable t) { + log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t); lethal = true; - lethalReason = e.toString(); + lethalReason = ExceptionUtils.getMessage(t); } } @@ -238,8 +242,9 @@ public class ProcessorTracker { } // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查 - if (threadPool.getActiveCount() > 0) { + if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) { lastIdleTime = -1; + lastCompletedTaskCount = threadPool.getCompletedTaskCount(); }else { if (lastIdleTime == -1) { lastIdleTime = System.currentTimeMillis(); @@ -291,7 +296,7 @@ public class ProcessorTracker { try { processor = SpringUtils.getBean(processorInfo); }catch (Exception e) { - log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString()); + log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, ExceptionUtils.getMessage(e)); } } // 反射加载 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 55c38634..15827976 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -446,7 +446,7 @@ public abstract class TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; - long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2; + long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java index 0ecc4e90..77074c18 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.worker.log; /** - * OhMyScheduler 在线日志,直接上报到 Server,可在控制台直接查看 + * PowerJob 在线日志,直接上报到 Server,可在控制台直接查看 * * @author tjq * @since 2020/4/21 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java index 50021cc2..38d7c1b7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.worker.log.impl; +import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.worker.background.OmsLogHandler; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import lombok.AllArgsConstructor; @@ -9,7 +10,7 @@ import org.slf4j.helpers.MessageFormatter; /** - * OhMyScheduler 在线日志,直接上报到 Server,可在控制台直接查看 + * PowerJob 在线日志,直接上报到 Server,可在控制台直接查看 * * @author tjq * @since 2020/4/21 @@ -19,45 +20,35 @@ public class OmsServerLogger implements OmsLogger { private final long instanceId; - // Level|业务方自身的日志 - private static final String LOG_PREFIX = "{} "; - @Override public void debug(String messagePattern, Object... args) { - process("DEBUG", messagePattern, args); + process(LogLevel.DEBUG, messagePattern, args); } @Override public void info(String messagePattern, Object... args) { - process("INFO", messagePattern, args); + process(LogLevel.INFO, messagePattern, args); } @Override public void warn(String messagePattern, Object... args) { - process("WARN", messagePattern, args); + process(LogLevel.WARN, messagePattern, args); } @Override public void error(String messagePattern, Object... args) { - process("ERROR", messagePattern, args); + process(LogLevel.ERROR, messagePattern, args); } /** * 生成日志内容 - * @param level 级别,DEBUG/INFO/WARN/ERROR * @param messagePattern 日志格式 * @param arg 填充参数 * @return 生成完毕的日志内容 */ - private static String genLog(String level, String messagePattern, Object... arg) { - - String pattern = LOG_PREFIX + messagePattern; - Object[] newArgs = new Object[arg.length + 1]; - newArgs[0] = level; - System.arraycopy(arg, 0, newArgs, 1, arg.length); - + private static String genLogContent(String messagePattern, Object... arg) { // 借用 Slf4J 直接生成日志信息 - FormattingTuple formattingTuple = MessageFormatter.arrayFormat(pattern, newArgs); + FormattingTuple formattingTuple = MessageFormatter.arrayFormat(messagePattern, arg); if (formattingTuple.getThrowable() != null) { String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable()); return formattingTuple.getMessage() + System.lineSeparator() + stackTrace; @@ -66,9 +57,9 @@ public class OmsServerLogger implements OmsLogger { } } - private void process(String level, String messagePattern, Object... args) { - String logContent = genLog(level, messagePattern, args); - OmsLogHandler.INSTANCE.submitLog(instanceId, logContent); + private void process(LogLevel level, String messagePattern, Object... args) { + String logContent = genLogContent(messagePattern, args); + OmsLogHandler.INSTANCE.submitLog(instanceId, level, logContent); } } \ No newline at end of file