[release] v3.4.2

This commit is contained in:
tjq 2021-01-03 14:47:12 +08:00
commit 9e6ba33f90
31 changed files with 322 additions and 228 deletions

109
README.md
View File

@ -1,3 +1,5 @@
English | [简体中文](./README_zhCN.md)
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>
@ -9,68 +11,75 @@
<a href="https://github.com/KFCFans/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
</p>
PowerJob原OhMyScheduler是全新一代分布式调度与计算框架能让您轻松完成作业的调度与繁杂任务的分布式计算。
# 简介
### 主要特性
* 使用简单提供前端Web界面允许开发者可视化地完成调度任务的管理增、删、改、查、任务运行状态监控和运行日志查看等功能。
* 定时策略完善支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。
* 执行模式丰富支持单机、广播、Map、MapReduce四种执行模式其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
* DAG工作流支持支持在线配置任务依赖关系可视化得对任务进行编排同时还支持上下游任务间的数据传递
* 执行器支持广泛支持Spring Bean、内置/外置Java类、Shell、Python等处理器应用范围广。
* 运维便捷支持在线日志功能执行器产生的日志可以在前端控制台页面实时显示降低debug成本极大地提高开发效率。
* 依赖精简最小仅依赖关系型数据库MySQL/Oracle/MS SQLServer...扩展依赖为MongoDB用于存储庞大的在线日志
* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
- Have you ever wondered how cron jobs could be organized orderly?
- Have you ever felt upset when scheduling tasks suddenly terminated without any warning?
- Have you ever felt helpless when batches of business tasks require handling?
- Have you ever felt depressed about tasks that carry with complex dependencies?
### 适用场景
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景比如需要更新一大批数据单机执行耗时非常长可以使用Map/MapReduce处理器完成任务的分发调动整个集群加速计算。
* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。
Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you!
### 设计目标
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
# Introduction
### 在线试用
试用地址:[try.powerjob.tech](http://try.powerjob.tech/)
试用应用名称powerjob-agent-test
控制台密码123
### Features
- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online.
- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API.
- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability.
- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow.
- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on.
- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online.
- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need.
- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally.
- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly.
[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### Applicable scenes
### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ |
| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** |
| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置JavaFatJar、Shell、Python等脚本 | **内置Java、外置Java容器、Shell、Python等脚本** |
| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** |
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time.
- Scenarios that require all machines to run tasks simultaneously: such as log cleanup.
- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster.
- Scenarios with delayed tasks: For instance, disposal of overdue orders.
### Design goals
# 官方文档
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center,
all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker.
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
### Online trial
PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!
Trial address: [Online Trial Address](http://try.powerjob.tech/)
Application name: powerjob-agent-test
Application password: 123
# 接入登记
[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6)
### Comparison with similar products
ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** |
| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** |
| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** |
| Online task management | Unsupported | Supported | Supported | **Supported** |
| Online logging | Unsupported | Supported | Unsupported | **Supported** |
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** |
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| workflow | Unsupported | Unsupported | Supported | **Supported** |
# Document
**[Docs](https://www.yuque.com/powerjob/en/introduce)**
<p align="center">
**[中文文档](https://www.yuque.com/powerjob/product)**
# User Registration
[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6)
ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ
<p style="text-align: center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/user.png" alt="PowerJob User" title="PowerJob User"/>
</p>
# 其他
* 开源许可证Apache License, Version 2.0
* 欢迎共同参与本项目的贡献PR和Issue都大大滴欢迎求求了
* 觉得还不错的话可以点个Star支持一下哦 = ̄ω ̄=
* 联系方式@KFCFans -> `tengjiqi@gmail.com`
* 用户交流QQ群487453839
# Others
- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production!
- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs!
- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious.
- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ =
- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly.

View File

@ -1,83 +0,0 @@
<p style="text-align: center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>
<p style="text-align: center">
<a href="https://github.com/KFCFans/PowerJob/actions"><img src="https://github.com/KFCFans/PowerJob/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master" alt="actions"></a>
<a href="https://search.maven.org/search?q=com.github.kfcfans"><img alt="Maven Central" src="https://img.shields.io/maven-central/v/com.github.kfcfans/powerjob-worker"></a>
<a href="https://github.com/KFCFans/PowerJob/releases"><img alt="GitHub release (latest SemVer)" src="https://img.shields.io/github/v/release/kfcfans/powerjob?color=%23E59866"></a>
<a href="https://github.com/KFCFans/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
</p>
- Have you ever wondered how cron jobs could be organized orderly?
- Have you ever felt upset when scheduling tasks suddenly terminated without any warning?
- Have you ever felt helpless when batches of business tasks require handling?
- Have you ever felt depressed about tasks that carry with complex dependencies?
Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you!
# Introduction
### Features
- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online.
- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API.
- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability.
- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow.
- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on.
- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online.
- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need.
- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally.
- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly.
### Applicable scenes
- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time.
- Scenarios that require all machines to run tasks simultaneously: such as log cleanup.
- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster.
- Scenarios with delayed tasks: For instance, disposal of overdue orders.
### Design goals
PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center,
all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker.
### Online trial
Trial address: [Online Trial Address](http://try.powerjob.tech/)
Application name: powerjob-agent-test
Application password: 123
### Comparison with similar products
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** |
| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** |
| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** |
| Online task management | Unsupported | Supported | Supported | **Supported** |
| Online logging | Unsupported | Supported | Unsupported | **Supported** |
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** |
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| workflow | Unsupported | Unsupported | Supported | **Supported** |
# Document
**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)**
**[中文文档](https://www.yuque.com/powerjob/product)**
# User Registration
[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6)
ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ
<p style="text-align: center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/user.png" alt="PowerJob User" title="PowerJob User"/>
</p>
# Others
- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production!
- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs!
- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious.
- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ =
- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly.

78
README_zhCN.md Normal file
View File

@ -0,0 +1,78 @@
[English](./README.md) | 简体中文
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>
<p align="center">
<a href="https://github.com/KFCFans/PowerJob/actions"><img src="https://github.com/KFCFans/PowerJob/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master" alt="actions"></a>
<a href="https://search.maven.org/search?q=com.github.kfcfans"><img alt="Maven Central" src="https://img.shields.io/maven-central/v/com.github.kfcfans/powerjob-worker"></a>
<a href="https://github.com/KFCFans/PowerJob/releases"><img alt="GitHub release (latest SemVer)" src="https://img.shields.io/github/v/release/kfcfans/powerjob?color=%23E59866"></a>
<a href="https://github.com/KFCFans/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
</p>
PowerJob原OhMyScheduler是全新一代分布式调度与计算框架能让您轻松完成作业的调度与繁杂任务的分布式计算。
# 简介
### 主要特性
* 使用简单提供前端Web界面允许开发者可视化地完成调度任务的管理增、删、改、查、任务运行状态监控和运行日志查看等功能。
* 定时策略完善支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。
* 执行模式丰富支持单机、广播、Map、MapReduce四种执行模式其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
* DAG工作流支持支持在线配置任务依赖关系可视化得对任务进行编排同时还支持上下游任务间的数据传递
* 执行器支持广泛支持Spring Bean、内置/外置Java类、Shell、Python等处理器应用范围广。
* 运维便捷支持在线日志功能执行器产生的日志可以在前端控制台页面实时显示降低debug成本极大地提高开发效率。
* 依赖精简最小仅依赖关系型数据库MySQL/Oracle/MS SQLServer...扩展依赖为MongoDB用于存储庞大的在线日志
* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
### 适用场景
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景比如需要更新一大批数据单机执行耗时非常长可以使用Map/MapReduce处理器完成任务的分发调动整个集群加速计算。
* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。
### 设计目标
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
### 在线试用
试用地址:[try.powerjob.tech](http://try.powerjob.tech/)
试用应用名称powerjob-agent-test
控制台密码123
[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ |
| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** |
| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置JavaFatJar、Shell、Python等脚本 | **内置Java、外置Java容器、Shell、Python等脚本** |
| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** |
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
# 官方文档
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!
# 接入登记
[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6)
ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/user.png" alt="PowerJob User" title="PowerJob User"/>
</p>
# 其他
* 开源许可证Apache License, Version 2.0
* 欢迎共同参与本项目的贡献PR和Issue都大大滴欢迎求求了
* 觉得还不错的话可以点个Star支持一下哦 = ̄ω ̄=
* 联系方式@KFCFans -> `tengjiqi@gmail.com`
* 用户交流QQ群487453839

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
<powerjob.common.version>3.4.1</powerjob.common.version>
<powerjob.common.version>3.4.2</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<packaging>jar</packaging>
<properties>

View File

@ -0,0 +1,34 @@
package com.github.kfcfans.powerjob.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
/**
* 日志级别
*
* @author tjq
* @since 12/20/20
*/
@Getter
@AllArgsConstructor
public enum LogLevel {
DEBUG(1),
INFO(2),
WARN(3),
ERROR(4);
private final int v;
public static String genLogLevelString(Integer v) {
for (LogLevel logLevel : values()) {
if (Objects.equals(logLevel.v, v)) {
return logLevel.name();
}
}
return "UNKNOWN";
}
}

View File

@ -20,6 +20,8 @@ public class InstanceLogContent implements OmsSerializable {
private long instanceId;
// 日志提交时间
private long logTime;
// 级别
private int logLevel;
// 日志内容
private String logContent;
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.4.1</powerjob.common.version>
<powerjob.common.version>3.4.2</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
@ -214,6 +214,7 @@
<executions>
<execution>
<goals>
<goal>build-info</goal>
<goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中-->
</goals>
</execution>

View File

@ -1,5 +1,8 @@
package com.github.kfcfans.powerjob.server.common.config;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
@ -11,26 +14,40 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
import static springfox.documentation.builders.PathSelectors.any;
/**
* Swagger UI 配置
* Configuration class for Swagger UI.
*
* @author tjq
* @author Jiang Jining
* @since 2020/3/29
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
private final BuildProperties buildProperties;
public SwaggerConfig(@Autowired(required = false) final BuildProperties buildProperties) {
this.buildProperties = buildProperties;
}
@Bean
public Docket createRestApi() {
String version = "unknown";
if (buildProperties != null) {
String pomVersion = buildProperties.getVersion();
if (StringUtils.isNotBlank(pomVersion)) {
version = pomVersion;
}
}
// apiInfo()用来创建该Api的基本信息这些基本信息会展现在文档页面中
ApiInfo apiInfo = new ApiInfoBuilder()
.title("PowerJob")
.description("Distributed scheduling and computing framework.")
.license("Apache Licence 2")
.termsOfServiceUrl("https://github.com/KFCFans/PowerJob")
.version("3.3.3")
.version(version)
.build();
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo)
// select()函数返回一个ApiSelectorBuilder实例
@ -39,5 +56,5 @@ public class SwaggerConfig {
.paths(any())
.build();
}
}

View File

@ -28,8 +28,8 @@ public class ThreadPoolConfig {
@Bean("omsTimingPool")
public Executor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 16);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 32);
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// use SynchronousQueue
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);

View File

@ -28,6 +28,10 @@ public class LocalInstanceLogDO {
* 日志时间
*/
private Long logTime;
/**
* 日志级别 {@link com.github.kfcfans.powerjob.common.LogLevel}
*/
private Integer logLevel;
/**
* 日志内容
*/

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.LogLevel;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.InstanceLogContent;
@ -320,12 +321,16 @@ public class InstanceLogService {
/**
* 拼接日志 -> 2020-04-29 22:07:10.059 192.168.1.1:2777 INFO XXX
* 拼接日志 -> 2020-04-29 22:07:10.059 [192.168.1.1:2777] INFO XXX
* @param instanceLog 日志对象
* @return 字符串
*/
private static String convertLog(LocalInstanceLogDO instanceLog) {
return String.format("%s [%s] -%s", dateFormat.format(instanceLog.getLogTime()), instanceLog.getWorkerAddress(), instanceLog.getLogContent());
return String.format("%s [%s] %s %s",
dateFormat.format(instanceLog.getLogTime()),
instanceLog.getWorkerAddress(),
LogLevel.genLogLevelString(instanceLog.getLogLevel()),
instanceLog.getLogContent());
}

View File

@ -4,14 +4,11 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 基于数据库实现的分布式锁
@ -26,35 +23,27 @@ public class DatabaseLockService implements LockService {
@Resource
private OmsLockRepository omsLockRepository;
private Map<String, AtomicInteger> lockName2FailedTimes = Maps.newConcurrentMap();
private static final int MAX_FAILED_NUM = 5;
@Override
public boolean lock(String name, long maxLockTime) {
AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0));
OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime);
try {
omsLockRepository.saveAndFlush(newLock);
failedCount.set(0);
return true;
}catch (DataIntegrityViolationException ignore) {
}catch (Exception e) {
} catch (DataIntegrityViolationException ignore) {
} catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
}
// 连续失败一段时间需要判断是否为锁释放失败的情况
if (failedCount.incrementAndGet() > MAX_FAILED_NUM) {
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
if (lockedMillions > omsLockDO.getMaxLockTime()) {
// 锁超时强制释放锁并重新尝试获取
if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO);
unlock(name);
} else {
failedCount.set(0);
}
log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name);
return lock(name, maxLockTime);
}
return false;
}

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.lock.LockService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
@ -36,6 +37,8 @@ public class CleanService {
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private LockService lockService;
@Value("${oms.instanceinfo.retention}")
private int instanceInfoRetentionDay;
@ -51,6 +54,8 @@ public class CleanService {
// 每天凌晨3点定时清理
private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?";
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
@Async("omsTimingPool")
@Scheduled(cron = CLEAN_TIME_EXPRESSION)
@ -59,18 +64,35 @@ public class CleanService {
// 释放本地缓存
WorkerManagerService.cleanUp();
// 删除数据库运行记录
cleanInstanceLog();
cleanWorkflowInstanceLog();
// 释放磁盘空间
cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay);
cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay);
cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY);
// 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
// 删除数据库历史的数据
cleanByOneServer();
}
/**
* 只能一台server清理的操作统一到这里执行
*/
private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了
boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000);
if (!lock) {
log.info("[CleanService] clean job is already running, just return.");
return;
}
try {
// 删除数据库运行记录
cleanInstanceLog();
cleanWorkflowInstanceLog();
// 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
} finally {
lockService.unlock(HISTORY_DELETE_LOCK);
}
}
@VisibleForTesting
@ -91,7 +113,7 @@ public class CleanService {
}
// 计算最大偏移量
long maxOffset = day * 24 * 60 * 60 * 1000;
long maxOffset = day * 24 * 60 * 60 * 1000L;
for (File f : logFiles) {
long offset = System.currentTimeMillis() - f.lastModified();

View File

@ -60,7 +60,7 @@ public class InstanceStatusCheckService {
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Async("omsTimingPool")
@Scheduled(fixedRate = 10000)
@Scheduled(fixedDelay = 10000)
public void timingStatusCheck() {
Stopwatch stopwatch = Stopwatch.createStarted();
@ -115,7 +115,7 @@ public class InstanceStatusCheckService {
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
List<InstanceInfoDO> waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
log.warn("[InstanceStatusChecker] instances({}) didn't receive any reply from worker.", waitingWorkerReceiveInstances);
log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances);
waitingWorkerReceiveInstances.forEach(instance -> {
// 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);

View File

@ -133,7 +133,7 @@ public class OmsScheduleService {
// 1. 批量写日志表
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
log.info("[CronScheduler] These cron jobs will be scheduled {}.", jobInfos);
log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos);
jobInfos.forEach(jobInfo -> {
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime());

View File

@ -314,7 +314,7 @@ public class WorkflowInstanceManager {
}
/**
* 允许任务实例
* 运行任务实例
* 需要将创建和运行任务实例分离否则在秒失败情况下会发生DAG覆盖更新的问题
* @param jobId 任务ID
* @param instanceId 任务实例ID

View File

@ -17,6 +17,7 @@ import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest;
import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO;
import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Page;
@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -92,6 +94,24 @@ public class InstanceController {
OmsFileUtils.file2HttpResponse(file, response);
}
@GetMapping("/downloadLog4Console")
public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) throws Exception {
// 获取内部下载链接
String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId);
// 先下载到本机
String logFilePath = OmsFileUtils.genTemporaryWorkPath() + String.format("powerjob-%s-%s.log", appId, instanceId);
File logFile = new File(logFilePath);
try {
FileUtils.copyURLToFile(new URL(downloadUrl), logFile);
// 再推送到浏览器
OmsFileUtils.file2HttpResponse(logFile, response);
} finally {
FileUtils.forceDelete(logFile);
}
}
@PostMapping("/list")
public ResultDTO<PageResult<InstanceInfoVO>> list(@RequestBody QueryInstanceRequest request) {

View File

@ -122,9 +122,6 @@ public class OpenAPIController {
/* ************* Workflow 区 ************* */
@PostMapping(OpenAPIConstant.SAVE_WORKFLOW)
public ResultDTO<Long> saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception {
if (request.getId() != null) {
checkJobIdValid(request.getId(), request.getAppId());
}
return ResultDTO.success(workflowService.saveWorkflow(request));
}

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.4.1</powerjob.worker.version>
<powerjob.worker.version>3.4.2</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.4.1</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.4.2</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.4.1</powerjob.worker.version>
<powerjob.worker.version>3.4.2</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.4.1</powerjob.common.version>
<powerjob.common.version>3.4.2</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.worker.background;
import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.LogLevel;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.InstanceLogContent;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
@ -48,14 +49,14 @@ public class OmsLogHandler {
* @param instanceId 任务实例ID
* @param logContent 日志内容
*/
public void submitLog(long instanceId, String logContent) {
public void submitLog(long instanceId, LogLevel logLevel, String logContent) {
if (logQueue.size() > REPORT_SIZE) {
// 线程的生命周期是个不可循环的过程一个线程对象结束了不能再次start只能一直创建和销毁
new Thread(logSubmitter).start();
}
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent);
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
logQueue.offer(tuple);
}

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.powerjob.worker.common.constants;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.List;
import java.util.Set;
/**
* 任务状态task_info 表中 status 字段的枚举值
@ -23,10 +23,10 @@ public enum TaskStatus {
WORKER_PROCESS_FAILED(5, "worker执行失败"),
WORKER_PROCESS_SUCCESS(6, "worker执行成功");
public static final List<Integer> finishedStatus = Lists.newArrayList(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value);
public static final Set<Integer> finishedStatus = Sets.newHashSet(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value);
private int value;
private String des;
private final int value;
private final String des;
public static TaskStatus of(int v) {
for (TaskStatus taskStatus : values()) {

View File

@ -177,9 +177,9 @@ public class ProcessorRunnable implements Runnable {
if (!success) {
// 插入重试队列等待重试
statusReportRetryQueue.add(req);
log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed.", task.getInstanceId(), task.getTaskId(), status, result);
log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", task.getInstanceId(), task.getTaskId(), status, result);
}
}else {
} else {
taskTrackerActor.tell(req, null);
}
}

View File

@ -55,8 +55,10 @@ public class ProcessorTracker {
private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
// 上一次空闲时间
// 上一次空闲时间用于闲置判定
private long lastIdleTime;
// 上次完成任务数量用于闲置判定
private long lastCompletedTaskCount;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
@ -88,6 +90,7 @@ public class ProcessorTracker {
this.omsLogger = new OmsServerLogger(instanceId);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L;
this.lastCompletedTaskCount = 0L;
// 初始化 线程池TimingPool 启动的任务会检查 ThreadPool所以必须先初始化线程池否则NPE
initThreadPool();
@ -239,8 +242,9 @@ public class ProcessorTracker {
}
// 判断线程池活跃状态长时间空闲则上报 TaskTracker 请求检查
if (threadPool.getActiveCount() > 0) {
if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
lastIdleTime = -1;
lastCompletedTaskCount = threadPool.getCompletedTaskCount();
}else {
if (lastIdleTime == -1) {
lastIdleTime = System.currentTimeMillis();
@ -264,6 +268,7 @@ public class ProcessorTracker {
req.setReportTime(System.currentTimeMillis());
if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) {
statusReportRetryQueue.add(req);
log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", instanceId, req);
return;
}
}

View File

@ -56,8 +56,7 @@ public class CommonTaskTracker extends TaskTracker {
// 持久化根任务
persistenceRootTask();
// 启动定时任务任务派发 & 状态检查
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 5, TimeUnit.SECONDS);
// 开启定时状态检查
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 13, 13, TimeUnit.SECONDS);
// 如果是 MR 任务则需要启动执行器动态检测装置
@ -65,6 +64,9 @@ public class CommonTaskTracker extends TaskTracker {
if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
}
// 最后启动任务派发器否则会出现 TaskTracker 还未创建完毕 ProcessorTracker 已开始汇报状态的情况
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 10, 5000, TimeUnit.MILLISECONDS);
}
@Override

View File

@ -119,7 +119,7 @@ public abstract class TaskTracker {
case FIX_DELAY:return new FrequentTaskTracker(req);
default:return new CommonTaskTracker(req);
}
}catch (Exception e) {
} catch (Exception e) {
log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e);
// 直接发送失败请求
@ -301,7 +301,7 @@ public abstract class TaskTracker {
return;
}
log.info("[TaskTracker-{}] finished broadcast's preProcess.", instanceId);
log.info("[TaskTracker-{}-{}] finished broadcast's preProcess, preExecuteSuccess:{},preTaskId:{},result:{}", instanceId, subInstanceId, preExecuteSuccess, preTaskId, result);
// 生成集群子任务
if (preExecuteSuccess) {
@ -316,7 +316,7 @@ public abstract class TaskTracker {
}
submitTask(subTaskList);
}else {
log.debug("[TaskTracker-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, result);
log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result);
}
}
@ -446,7 +446,7 @@ public abstract class TaskTracker {
// 3. 避免大查询分批派发任务
long currentDispatchNum = 0;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
AtomicInteger index = new AtomicInteger(0);
// 4. 循环查询数据库获取需要派发的任务

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.worker.log;
/**
* OhMyScheduler 在线日志直接上报到 Server可在控制台直接查看
* PowerJob 在线日志直接上报到 Server可在控制台直接查看
*
* @author tjq
* @since 2020/4/21

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.worker.log.impl;
import com.github.kfcfans.powerjob.common.LogLevel;
import com.github.kfcfans.powerjob.worker.background.OmsLogHandler;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import lombok.AllArgsConstructor;
@ -9,7 +10,7 @@ import org.slf4j.helpers.MessageFormatter;
/**
* OhMyScheduler 在线日志直接上报到 Server可在控制台直接查看
* PowerJob 在线日志直接上报到 Server可在控制台直接查看
*
* @author tjq
* @since 2020/4/21
@ -19,45 +20,35 @@ public class OmsServerLogger implements OmsLogger {
private final long instanceId;
// Level|业务方自身的日志
private static final String LOG_PREFIX = "{} ";
@Override
public void debug(String messagePattern, Object... args) {
process("DEBUG", messagePattern, args);
process(LogLevel.DEBUG, messagePattern, args);
}
@Override
public void info(String messagePattern, Object... args) {
process("INFO", messagePattern, args);
process(LogLevel.INFO, messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
process("WARN", messagePattern, args);
process(LogLevel.WARN, messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
process("ERROR", messagePattern, args);
process(LogLevel.ERROR, messagePattern, args);
}
/**
* 生成日志内容
* @param level 级别DEBUG/INFO/WARN/ERROR
* @param messagePattern 日志格式
* @param arg 填充参数
* @return 生成完毕的日志内容
*/
private static String genLog(String level, String messagePattern, Object... arg) {
String pattern = LOG_PREFIX + messagePattern;
Object[] newArgs = new Object[arg.length + 1];
newArgs[0] = level;
System.arraycopy(arg, 0, newArgs, 1, arg.length);
private static String genLogContent(String messagePattern, Object... arg) {
// 借用 Slf4J 直接生成日志信息
FormattingTuple formattingTuple = MessageFormatter.arrayFormat(pattern, newArgs);
FormattingTuple formattingTuple = MessageFormatter.arrayFormat(messagePattern, arg);
if (formattingTuple.getThrowable() != null) {
String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable());
return formattingTuple.getMessage() + System.lineSeparator() + stackTrace;
@ -66,9 +57,9 @@ public class OmsServerLogger implements OmsLogger {
}
}
private void process(String level, String messagePattern, Object... args) {
String logContent = genLog(level, messagePattern, args);
OmsLogHandler.INSTANCE.submitLog(instanceId, logContent);
private void process(LogLevel level, String messagePattern, Object... args) {
String logContent = genLogContent(messagePattern, args);
OmsLogHandler.INSTANCE.submitLog(instanceId, level, logContent);
}
}