mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[release] v3.4.0
This commit is contained in:
commit
143663e333
@ -47,7 +47,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
|
||||
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
|
||||
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
|
||||
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
|
||||
| 报警监控 | 无 | 邮件 | 短信 | **邮件与钉钉,并支持开发者扩展** |
|
||||
| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** |
|
||||
| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** |
|
||||
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
|
||||
|
||||
|
@ -9,26 +9,31 @@
|
||||
<a href="https://github.com/KFCFans/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
|
||||
</p>
|
||||
|
||||
PowerJob is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.
|
||||
- Have you ever wondered how cron jobs could be organized orderly?
|
||||
- Have you ever felt upset when scheduling tasks suddenly terminated without any warning?
|
||||
- Have you ever felt helpless when batches of business tasks require handling?
|
||||
- Have you ever felt depressed about tasks that carry with complex dependencies?
|
||||
|
||||
Well, PowerJob is there for you, it is the choice of a new generation.It is a powerful, business-oriented scheduling framework that provides distributed computing ability.Based on Akka architecture, it makes everything with scheduling easier.Just with several steps, PowerJob could be deployed and work for you!
|
||||
|
||||
# Introduction
|
||||
|
||||
### Features
|
||||
- Simple to use: Provides a front-end Web interface that allows developers to visually complete the management of scheduled tasks (create, delete, update, and query), task operation status monitoring, and operation logs viewing.
|
||||
- Complete timing strategy: Support four timing scheduling strategies of CRON expression, fixed frequency, fixed delay and API.
|
||||
- Extensive execution modes: It supports four execution modes: stand-alone, broadcast, Map, and MapReduce. Among them, the Map / MapReduce processor enables developers to obtain cluster distributed computing capabilities with only a few lines of code.
|
||||
- Workflow(DAG) support: support online configuration of task dependencies, visually arrange tasks, as well as support for data transfer between upstream and downstream tasks
|
||||
- Extensive executor support: supports processors such as Spring Bean, ordinary Java objects, Shell, Python, and a wide range of applications (such as broadcast execution + Shell script to clear logs)
|
||||
- Convenient operation and maintenance: support online log function, the log generated by the actuator can be displayed on the front-end console page in real time, reduce the debugging cost, and greatly improve the development efficiency.
|
||||
- Dependency simplification: The smallest dependency-only database (MySQL / Oracle / MS SQLServer ...), the extended dependency is MongoDB (used to store huge online logs).
|
||||
- High availability & high performance: The scheduling server has been carefully designed to change the strategy of other scheduling frameworks based on database locks to achieve lock-free scheduling. Deploying multiple scheduling servers can achieve high availability and performance improvement at the same time (support unlimited horizontal expansion).
|
||||
- Failover and recovery: After the task fails to execute, the retry can be completed according to the configured retry strategy. As long as the executor cluster has enough computing nodes, the task can be successfully completed.
|
||||
- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor task status, and view operation logs online.
|
||||
- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API.
|
||||
- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With the completion of several lines of codes, developers could take full advantage of PowerJob's distributed computing ability.
|
||||
- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred between tasks on the flow.
|
||||
- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on.
|
||||
- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing huge online logs.
|
||||
- High availability and performance: Unlike traditional job-scheduling frameworks which rely on database locks, PowerJob server is lock-free when scheduling. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance just by deploying as many PowerJob server instances as you need.
|
||||
- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally.
|
||||
- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency for developers significantly.
|
||||
|
||||
### Applicable scene
|
||||
|
||||
- Business scenarios with regular execution requirements: such as synchronizing data in full volume every morning and generating business reports.
|
||||
- There are business scenarios that require all machines to perform together: such as log cleanup.
|
||||
- There are business scenarios that require distributed processing: for example, a large amount of data needs to be updated, and the stand-alone execution takes a long time. You can use the Map / MapReduce processors to complete the task distribution and mobilize the entire cluster to speed up the calculation.
|
||||
- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time.
|
||||
- Scenarios that require all machines to run tasks simultaneously: such as log cleanup.
|
||||
- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied while the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ablility of whole cluster.
|
||||
|
||||
### Comparison of similar products
|
||||
|
||||
@ -36,14 +41,14 @@ PowerJob is a powerful distributed scheduling platform and distributed computing
|
||||
| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
|
||||
| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** |
|
||||
| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** |
|
||||
| Distributed task | no | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** |
|
||||
| Online task governance | not support | support | support | **support** |
|
||||
| Log blanking | not support | support | not support | **support** |
|
||||
| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** |
|
||||
| Online task management | Unsupported | Supported | Supported | **Supported** |
|
||||
| Online logging | Unsupported | Supported | Unsupported | **Supported** |
|
||||
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
|
||||
| Alarm monitoring | no | mail | SMS | **Email, providing an interface to allow developers to customize development** |
|
||||
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
|
||||
| workflow | not support | not support | support | **support** |
|
||||
|
||||
| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, Dingtalk. An interface is provided for customization.** |
|
||||
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
|
||||
| workflow | Unsupported | Unsupported | Supported | **Supported** |
|
||||
|
||||
# Document
|
||||
**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)**
|
||||
|
||||
@ -51,7 +56,8 @@ PowerJob is a powerful distributed scheduling platform and distributed computing
|
||||
|
||||
# Others
|
||||
|
||||
- The product is permanently open source (Apache License, Version 2.0), free to use, and the current developer @KFCFans has sufficient time to maintain the project and provide free technical support (All of my time), welcome to try!
|
||||
- Welcome to participate in the contribution of this project, PR and Issue are greatly welcome (please) ~
|
||||
- If you feel pretty good, you can give it a star to support it ~ =  ̄ω ̄ =
|
||||
- Need some help or have some advice? Welcome to contact Developer @KFCFans-> `tengjiqi@gmail.com`
|
||||
- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, use or deploy!
|
||||
- Owner of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs!
|
||||
- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious.
|
||||
- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ =
|
||||
- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly.
|
Binary file not shown.
Before Width: | Height: | Size: 618 KiB After Width: | Height: | Size: 162 KiB |
@ -5,13 +5,13 @@
|
||||
Source Server Type : MySQL
|
||||
Source Server Version : 80021
|
||||
Source Host : localhost:3306
|
||||
Source Schema : powerjob-daily
|
||||
Source Schema : powerjob-db-template
|
||||
|
||||
Target Server Type : MySQL
|
||||
Target Server Version : 80021
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 08/10/2020 12:39:10
|
||||
Date: 28/11/2020 17:05:50
|
||||
*/
|
||||
|
||||
SET NAMES utf8mb4;
|
||||
@ -30,7 +30,7 @@ CREATE TABLE `app_info` (
|
||||
`password` varchar(255) DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `appNameUK` (`app_name`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for container_info
|
||||
@ -77,7 +77,7 @@ CREATE TABLE `instance_info` (
|
||||
KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`),
|
||||
KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`),
|
||||
KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for job_info
|
||||
@ -111,7 +111,7 @@ CREATE TABLE `job_info` (
|
||||
`time_expression_type` int DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for oms_lock
|
||||
@ -126,7 +126,7 @@ CREATE TABLE `oms_lock` (
|
||||
`ownerip` varchar(255) DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `lockNameUK` (`lock_name`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for server_info
|
||||
@ -148,12 +148,12 @@ DROP TABLE IF EXISTS `user_info`;
|
||||
CREATE TABLE `user_info` (
|
||||
`id` bigint NOT NULL AUTO_INCREMENT,
|
||||
`email` varchar(255) DEFAULT NULL,
|
||||
`extra` varchar(255) DEFAULT NULL,
|
||||
`gmt_create` datetime(6) DEFAULT NULL,
|
||||
`gmt_modified` datetime(6) DEFAULT NULL,
|
||||
`password` varchar(255) DEFAULT NULL,
|
||||
`phone` varchar(255) DEFAULT NULL,
|
||||
`username` varchar(255) DEFAULT NULL,
|
||||
`extra` varchar(255) DEFAULT NULL,
|
||||
`web_hook` varchar(255) DEFAULT NULL,
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
@ -178,7 +178,7 @@ CREATE TABLE `workflow_info` (
|
||||
`wf_name` varchar(255) DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for workflow_instance_info
|
||||
@ -189,6 +189,7 @@ CREATE TABLE `workflow_instance_info` (
|
||||
`actual_trigger_time` bigint DEFAULT NULL,
|
||||
`app_id` bigint DEFAULT NULL,
|
||||
`dag` longtext,
|
||||
`expected_trigger_time` bigint DEFAULT NULL,
|
||||
`finished_time` bigint DEFAULT NULL,
|
||||
`gmt_create` datetime(6) DEFAULT NULL,
|
||||
`gmt_modified` datetime(6) DEFAULT NULL,
|
||||
@ -198,6 +199,6 @@ CREATE TABLE `workflow_instance_info` (
|
||||
`wf_instance_id` bigint DEFAULT NULL,
|
||||
`workflow_id` bigint DEFAULT NULL,
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
@ -10,13 +10,13 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-client</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
<fastjson.version>1.2.68</fastjson.version>
|
||||
<powerjob.common.version>3.3.3</powerjob.common.version>
|
||||
<powerjob.common.version>3.4.0</powerjob.common.version>
|
||||
|
||||
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
|
||||
</properties>
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-common</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
|
@ -11,7 +11,7 @@ public class SystemInstanceResult {
|
||||
/* *********** 普通instance 专用 *********** */
|
||||
|
||||
// 同时运行的任务实例数过多
|
||||
public static final String TOO_MUCH_INSTANCE = "too much instance(%d>%d)";
|
||||
public static final String TOO_MANY_INSTANCES = "too many instances(%d>%d)";
|
||||
// 无可用worker
|
||||
public static final String NO_WORKER_AVAILABLE = "no worker available";
|
||||
// 任务执行超时
|
||||
|
@ -24,7 +24,8 @@ public enum WorkflowInstanceStatus {
|
||||
|
||||
// 广义的运行状态
|
||||
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING.v, RUNNING.v);
|
||||
|
||||
// 结束状态
|
||||
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
|
||||
|
||||
private int v;
|
||||
private String des;
|
||||
|
@ -27,6 +27,8 @@ public class WorkflowInstanceInfoDTO {
|
||||
private String dag;
|
||||
private String result;
|
||||
|
||||
// 预计触发时间
|
||||
private Long expectedTriggerTime;
|
||||
// 实际触发时间
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
|
@ -10,13 +10,13 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<swagger.version>2.9.2</swagger.version>
|
||||
<springboot.version>2.3.4.RELEASE</springboot.version>
|
||||
<powerjob.common.version>3.3.3</powerjob.common.version>
|
||||
<powerjob.common.version>3.4.0</powerjob.common.version>
|
||||
<!-- 数据库驱动版本,使用的是spring-boot-dependencies管理的版本 -->
|
||||
<mysql.version>8.0.19</mysql.version>
|
||||
<ojdbc.version>19.7.0.0</ojdbc.version>
|
||||
|
@ -0,0 +1,58 @@
|
||||
package com.github.kfcfans.powerjob.server.common;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
|
||||
/**
|
||||
* 拒绝策略
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/11/28
|
||||
*/
|
||||
@Slf4j
|
||||
public class RejectedExecutionHandlerFactory {
|
||||
|
||||
/**
|
||||
* 直接丢弃该任务
|
||||
* @param source log name
|
||||
* @return A handler for tasks that cannot be executed by ThreadPool
|
||||
*/
|
||||
public static RejectedExecutionHandler newReject(String source) {
|
||||
return (r, p) -> {
|
||||
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r);
|
||||
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用线程运行
|
||||
* @param source log name
|
||||
* @return A handler for tasks that cannot be executed by ThreadPool
|
||||
*/
|
||||
public static RejectedExecutionHandler newCallerRun(String source) {
|
||||
return (r, p) -> {
|
||||
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r);
|
||||
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
|
||||
if (!p.isShutdown()) {
|
||||
r.run();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 新线程运行
|
||||
* @param source log name
|
||||
* @return A handler for tasks that cannot be executed by ThreadPool
|
||||
*/
|
||||
public static RejectedExecutionHandler newThreadRun(String source) {
|
||||
return (r, p) -> {
|
||||
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r);
|
||||
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
|
||||
if (!p.isShutdown()) {
|
||||
new Thread(r).start();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package com.github.kfcfans.powerjob.server.common.config;
|
||||
|
||||
import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -33,11 +34,7 @@ public class ThreadPoolConfig {
|
||||
executor.setQueueCapacity(0);
|
||||
executor.setKeepAliveSeconds(60);
|
||||
executor.setThreadNamePrefix("omsTimingPool-");
|
||||
executor.setRejectedExecutionHandler((r, e) -> {
|
||||
log.warn("[OmsTimingService] timing pool can't schedule job immediately, maybe some job using too much cpu times.");
|
||||
// 定时任务优先级较高,不惜一些代价都需要继续执行,开线程继续干~
|
||||
new Thread(r).start();
|
||||
});
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTimingPool"));
|
||||
return executor;
|
||||
}
|
||||
|
||||
@ -49,7 +46,7 @@ public class ThreadPoolConfig {
|
||||
executor.setQueueCapacity(8192);
|
||||
executor.setKeepAliveSeconds(60);
|
||||
executor.setThreadNamePrefix("omsBackgroundPool-");
|
||||
executor.setRejectedExecutionHandler(new LogOnRejected());
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool"));
|
||||
return executor;
|
||||
}
|
||||
|
||||
@ -63,11 +60,4 @@ public class ThreadPoolConfig {
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
private static final class LogOnRejected implements RejectedExecutionHandler {
|
||||
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
|
||||
log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
|
||||
import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
|
||||
import com.google.common.collect.Queues;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
@ -66,9 +67,9 @@ public class HashedWheelTimer implements Timer {
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
|
||||
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
|
||||
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
|
||||
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
|
||||
taskProcessPool = new ThreadPoolExecutor(core, 4 * core,
|
||||
60, TimeUnit.SECONDS,
|
||||
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
|
||||
}
|
||||
|
||||
startTime = System.currentTimeMillis();
|
||||
@ -172,6 +173,11 @@ public class HashedWheelTimer implements Timer {
|
||||
|
||||
removeIf(timerFuture -> {
|
||||
|
||||
// processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
|
||||
if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
|
||||
log.warn("[HashedWheelTimer] impossible, please fix the bug");
|
||||
return true;
|
||||
|
@ -48,6 +48,8 @@ public class WorkflowInstanceInfoDO {
|
||||
@Column
|
||||
private String result;
|
||||
|
||||
// 预计触发时间
|
||||
private Long expectedTriggerTime;
|
||||
// 实际触发时间
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
|
@ -71,6 +71,6 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
|
||||
// 结果只能用 int 接收
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Query(value = "delete from InstanceInfoDO where gmtModified < ?1")
|
||||
int deleteAllByGmtModifiedBefore(Date time);
|
||||
@Query(value = "delete from InstanceInfoDO where gmtModified < ?1 and status in ?2")
|
||||
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
|
||||
}
|
||||
|
@ -24,11 +24,11 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowIn
|
||||
// 结果只能用 int 接收
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1")
|
||||
int deleteAllByGmtModifiedBefore(Date time);
|
||||
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1 and status in ?2")
|
||||
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
|
||||
|
||||
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);
|
||||
|
||||
// 状态检查
|
||||
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> appIds, int status, Date before);
|
||||
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> appIds, int status, long time);
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ public class DispatchService {
|
||||
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
|
||||
// 超出最大同时运行限制,不执行调度
|
||||
if (runningInstanceCount > maxInstanceNum) {
|
||||
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, maxInstanceNum);
|
||||
String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum);
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
|
||||
|
||||
|
@ -0,0 +1,71 @@
|
||||
package com.github.kfcfans.powerjob.server.service;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.OmsConstant;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 校验服务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/11/28
|
||||
*/
|
||||
public class ValidateService {
|
||||
|
||||
private static final int NEXT_N_TIMES = 5;
|
||||
|
||||
/**
|
||||
* 计算指定时间表达式接下来的运行状况
|
||||
* @param timeExpressionType 时间表达式类型
|
||||
* @param timeExpression 时间表达式
|
||||
* @return 最近 N 次运行的时间
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public static List<String> calculateNextTriggerTime(TimeExpressionType timeExpressionType, String timeExpression) throws Exception {
|
||||
switch (timeExpressionType) {
|
||||
case API: return Lists.newArrayList(OmsConstant.NONE);
|
||||
case WORKFLOW: return Lists.newArrayList("VALID: depends on workflow");
|
||||
case CRON: return calculateCronExpression(timeExpression);
|
||||
case FIX_RATE: return calculateFixRate(timeExpression);
|
||||
case FIX_DELAY: return Lists.newArrayList("VALID: depends on execution cost time");
|
||||
}
|
||||
// impossible
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
||||
private static List<String> calculateFixRate(String timeExpression) {
|
||||
List<String> result = Lists.newArrayList();
|
||||
long delay = Long.parseLong(timeExpression);
|
||||
for (int i = 0; i < NEXT_N_TIMES; i++) {
|
||||
long nextTime = System.currentTimeMillis() + i * delay;
|
||||
result.add(DateFormatUtils.format(nextTime, OmsConstant.TIME_PATTERN));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static List<String> calculateCronExpression(String expression) throws ParseException {
|
||||
CronExpression cronExpression = new CronExpression(expression);
|
||||
List<String> result = Lists.newArrayList();
|
||||
Date time = new Date();
|
||||
for (int i = 0; i < NEXT_N_TIMES; i++) {
|
||||
Date nextValidTime = cronExpression.getNextValidTimeAfter(time);
|
||||
if (nextValidTime == null) {
|
||||
break;
|
||||
}
|
||||
result.add(DateFormatUtils.format(nextValidTime.getTime(), OmsConstant.TIME_PATTERN));
|
||||
time = nextValidTime;
|
||||
}
|
||||
if (result.isEmpty()) {
|
||||
result.add("INVALID: no next validate schedule time");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
@ -18,10 +18,15 @@ public class InstanceTimeWheelService {
|
||||
|
||||
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
|
||||
|
||||
// 精确时间轮,每 1S 走一格
|
||||
// 精确调度时间轮,每 1MS 走一格
|
||||
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
|
||||
// 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格
|
||||
private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
|
||||
|
||||
// 支持取消的时间间隔,低于该阈值则不会放进 CARGO
|
||||
private static final long MIN_INTERVAL_MS = 1000;
|
||||
// 长延迟阈值
|
||||
private static final long LONG_DELAY_THRESHOLD_MS = 60000;
|
||||
|
||||
/**
|
||||
* 定时调度
|
||||
@ -30,13 +35,17 @@ public class InstanceTimeWheelService {
|
||||
* @param timerTask 需要执行的目标方法
|
||||
*/
|
||||
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
|
||||
TimerFuture timerFuture = TIMER.schedule(() -> {
|
||||
CARGO.remove(uniqueId);
|
||||
timerTask.run();
|
||||
}, delayMS, TimeUnit.MILLISECONDS);
|
||||
if (delayMS > MIN_INTERVAL_MS) {
|
||||
CARGO.put(uniqueId, timerFuture);
|
||||
if (delayMS <= LONG_DELAY_THRESHOLD_MS) {
|
||||
realSchedule(uniqueId, delayMS, timerTask);
|
||||
return;
|
||||
}
|
||||
|
||||
long expectTriggerTime = System.currentTimeMillis() + delayMS;
|
||||
TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> {
|
||||
CARGO.remove(uniqueId);
|
||||
realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask);
|
||||
}, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS);
|
||||
CARGO.put(uniqueId, longDelayTask);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -48,4 +57,15 @@ public class InstanceTimeWheelService {
|
||||
return CARGO.get(uniqueId);
|
||||
}
|
||||
|
||||
|
||||
private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
|
||||
TimerFuture timerFuture = TIMER.schedule(() -> {
|
||||
CARGO.remove(uniqueId);
|
||||
timerTask.run();
|
||||
}, delayMS, TimeUnit.MILLISECONDS);
|
||||
if (delayMS > MIN_INTERVAL_MS) {
|
||||
CARGO.put(uniqueId, timerFuture);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package com.github.kfcfans.powerjob.server.service.timing;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
|
||||
@ -128,7 +130,7 @@ public class CleanService {
|
||||
}
|
||||
try {
|
||||
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
|
||||
int num = instanceInfoRepository.deleteAllByGmtModifiedBefore(t);
|
||||
int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.finishedStatus);
|
||||
log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t);
|
||||
}catch (Exception e) {
|
||||
log.warn("[CleanService] clean instanceInfo failed.", e);
|
||||
@ -142,7 +144,7 @@ public class CleanService {
|
||||
}
|
||||
try {
|
||||
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
|
||||
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(t);
|
||||
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.finishedStatus);
|
||||
log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t);
|
||||
}catch (Exception e) {
|
||||
log.warn("[CleanService] clean workflow instanceInfo failed.", e);
|
||||
|
@ -162,7 +162,7 @@ public class InstanceStatusCheckService {
|
||||
// 重试长时间处于 WAITING 状态的工作流实例
|
||||
long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS;
|
||||
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
|
||||
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, WorkflowInstanceStatus.WAITING.getV(), new Date(threshold));
|
||||
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
|
||||
if (!CollectionUtils.isEmpty(waitingWfInstanceList)) {
|
||||
|
||||
List<Long> wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList());
|
||||
|
@ -193,7 +193,7 @@ public class OmsScheduleService {
|
||||
wfInfos.forEach(wfInfo -> {
|
||||
|
||||
// 1. 先生成调度记录,防止不调度的情况发生
|
||||
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
|
||||
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime());
|
||||
|
||||
// 2. 推入时间轮,准备调度执行
|
||||
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();
|
||||
@ -220,6 +220,9 @@ public class OmsScheduleService {
|
||||
try {
|
||||
// 查询所有的秒级任务(只包含ID)
|
||||
List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes);
|
||||
if (CollectionUtils.isEmpty(jobIds)) {
|
||||
return;
|
||||
}
|
||||
// 查询日志记录表中是否存在相关的任务
|
||||
List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus);
|
||||
Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);
|
||||
|
@ -68,9 +68,10 @@ public class WorkflowInstanceManager {
|
||||
* 创建工作流任务实例
|
||||
* @param wfInfo 工作流任务元数据(描述信息)
|
||||
* @param initParams 启动参数
|
||||
* @param expectTriggerTime 预计执行时间
|
||||
* @return wfInstanceId
|
||||
*/
|
||||
public Long create(WorkflowInfoDO wfInfo, String initParams) {
|
||||
public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) {
|
||||
|
||||
Long wfId = wfInfo.getId();
|
||||
Long wfInstanceId = idGenerateService.allocate();
|
||||
@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
|
||||
newWfInstance.setWfInstanceId(wfInstanceId);
|
||||
newWfInstance.setWorkflowId(wfId);
|
||||
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
|
||||
newWfInstance.setExpectedTriggerTime(expectTriggerTime);
|
||||
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
|
||||
newWfInstance.setWfInitParams(initParams);
|
||||
|
||||
@ -129,7 +131,7 @@ public class WorkflowInstanceManager {
|
||||
// 并发度控制
|
||||
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.generalizedRunningStatus);
|
||||
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
|
||||
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
|
||||
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -168,7 +168,7 @@ public class WorkflowService {
|
||||
|
||||
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
|
||||
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
|
||||
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
|
||||
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
|
||||
if (delay <= 0) {
|
||||
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
|
||||
}else {
|
||||
|
@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.web;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.http.converter.HttpMessageNotReadableException;
|
||||
import org.springframework.messaging.handler.annotation.support.MethodArgumentTypeMismatchException;
|
||||
import org.springframework.web.HttpRequestMethodNotSupportedException;
|
||||
@ -34,6 +35,6 @@ public class ControllerExceptionHandler {
|
||||
} else {
|
||||
log.error("[ControllerException] http request failed.", e);
|
||||
}
|
||||
return ResultDTO.failed(e.getMessage());
|
||||
return ResultDTO.failed(ExceptionUtils.getMessage(e));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,32 @@
|
||||
package com.github.kfcfans.powerjob.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
||||
import com.github.kfcfans.powerjob.server.service.ValidateService;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 校验控制器
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/11/28
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/validate")
|
||||
public class ValidateController {
|
||||
|
||||
@GetMapping("/timeExpression")
|
||||
public ResultDTO<List<String>> checkTimeExpression(TimeExpressionType timeExpressionType, String timeExpression) {
|
||||
try {
|
||||
return ResultDTO.success(ValidateService.calculateNextTriggerTime(timeExpressionType, timeExpression));
|
||||
} catch (Exception e) {
|
||||
return ResultDTO.success(Lists.newArrayList(ExceptionUtils.getMessage(e)));
|
||||
}
|
||||
}
|
||||
}
|
@ -32,6 +32,8 @@ public class WorkflowInstanceInfoVO {
|
||||
private PEWorkflowDAG pEWorkflowDAG;
|
||||
private String result;
|
||||
|
||||
// 预计触发时间
|
||||
private String expectedTriggerTime;
|
||||
// 实际触发时间(需要格式化为人看得懂的时间)
|
||||
private String actualTriggerTime;
|
||||
// 结束时间(同理,需要格式化)
|
||||
@ -49,6 +51,9 @@ public class WorkflowInstanceInfoVO {
|
||||
vo.setWorkflowId(String.valueOf(wfInstanceDO.getWorkflowId()));
|
||||
|
||||
// 格式化时间
|
||||
if (wfInstanceDO.getExpectedTriggerTime() != null) {
|
||||
vo.setExpectedTriggerTime(DateFormatUtils.format(wfInstanceDO.getExpectedTriggerTime(), OmsConstant.TIME_PATTERN));
|
||||
}
|
||||
vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
|
||||
if (wfInstanceDO.getFinishedTime() == null) {
|
||||
vo.setFinishedTime(OmsConstant.NONE);
|
||||
|
@ -2,18 +2,17 @@
|
||||
<!-- 生产环境日志 -->
|
||||
<configuration>
|
||||
|
||||
<!--默认配置-->
|
||||
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
|
||||
<!--配置控制台(Console)-->
|
||||
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
|
||||
|
||||
<!--
|
||||
日志路径,注意权限问题,否则无法打印日志。
|
||||
大坑记录:`~/logs`不会在用户目录下创建文件夹,而是在项目目录下创建名为~的文件夹
|
||||
-->
|
||||
<property name="LOG_PATH" value="${user.home}/powerjob-server/logs"/>
|
||||
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
|
||||
<charset>utf8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统所有异常日志(ERROR)双写 start -->
|
||||
<appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -0,0 +1,126 @@
|
||||
package com.github.kfcfans.powerjob.server.test;
|
||||
|
||||
import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
|
||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* 时间轮测试
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/11/28
|
||||
*/
|
||||
@Slf4j
|
||||
public class HashedWheelTimerTest {
|
||||
|
||||
@Test
|
||||
public void testHashedWheelTimer() throws Exception {
|
||||
|
||||
HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
|
||||
List<TimerFuture> futures = Lists.newLinkedList();
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
|
||||
String name = "Task" + i;
|
||||
long nowMS = System.currentTimeMillis();
|
||||
int delayMS = ThreadLocalRandom.current().nextInt(60000);
|
||||
long targetTime = delayMS + nowMS;
|
||||
|
||||
TimerTask timerTask = () -> {
|
||||
System.out.println("============= " + name + "============= ");
|
||||
System.out.println("ThreadInfo:" + Thread.currentThread().getName());
|
||||
System.out.println("expectTime:" + targetTime);;
|
||||
System.out.println("currentTime:" + System.currentTimeMillis());
|
||||
System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
|
||||
System.out.println("============= " + name + "============= ");
|
||||
};
|
||||
futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
// 随机取消
|
||||
futures.forEach(future -> {
|
||||
|
||||
int x = ThreadLocalRandom.current().nextInt(2);
|
||||
if (x == 1) {
|
||||
future.cancel();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
// 关闭
|
||||
System.out.println(timer.stop().size());
|
||||
System.out.println("Finished!");
|
||||
|
||||
Thread.sleep(277777777);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerformance() throws Exception {
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
for (long i = 0; i < 10000000; i++) {
|
||||
long delay = ThreadLocalRandom.current().nextLong(100, 120000);
|
||||
long expect = System.currentTimeMillis() + delay;
|
||||
InstanceTimeWheelService.schedule(i, delay, () -> {
|
||||
log.info("[Performance] deviation:{}", (System.currentTimeMillis() - expect));
|
||||
});
|
||||
}
|
||||
log.info("[Performance] insert cost: {}", sw);
|
||||
|
||||
Thread.sleep(90000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongDelayTask() throws Exception {
|
||||
for (long i = 0; i < 1000000; i++) {
|
||||
long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 3);
|
||||
long expect = System.currentTimeMillis() + delay;
|
||||
InstanceTimeWheelService.schedule(i, delay, () -> {
|
||||
log.info("[LongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
|
||||
});
|
||||
}
|
||||
|
||||
Thread.sleep(60000 * 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelDelayTask() throws Exception {
|
||||
|
||||
AtomicLong executeNum = new AtomicLong();
|
||||
AtomicLong cancelNum = new AtomicLong();
|
||||
for (long i = 0; i < 1000000; i++) {
|
||||
long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 2);
|
||||
long expect = System.currentTimeMillis() + delay;
|
||||
InstanceTimeWheelService.schedule(i, delay, () -> {
|
||||
executeNum.incrementAndGet();
|
||||
log.info("[CancelLongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
|
||||
});
|
||||
}
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
for (long i = 0; i < 1000000; i++) {
|
||||
boolean nextBoolean = ThreadLocalRandom.current().nextBoolean();
|
||||
if (nextBoolean) {
|
||||
continue;
|
||||
}
|
||||
boolean cancel = InstanceTimeWheelService.fetchTimerFuture(i).cancel();
|
||||
log.info("[CancelLongDelayTask] id:{},status:{}", i, cancel);
|
||||
cancelNum.incrementAndGet();
|
||||
}
|
||||
|
||||
Thread.sleep(60000 * 4);
|
||||
log.info("[CancelLongDelayTask] result -> executeNum:{},cancelNum:{}", executeNum, cancelNum);
|
||||
}
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
package com.github.kfcfans.powerjob.server.test;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
|
||||
@ -104,12 +106,12 @@ public class RepositoryTest {
|
||||
|
||||
@Test
|
||||
public void testDeleteInstanceInfo() {
|
||||
instanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
|
||||
instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.finishedStatus);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWorkflowInstanceInfo() {
|
||||
workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
|
||||
workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.finishedStatus);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -24,49 +24,6 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class UtilsTest {
|
||||
|
||||
@Test
|
||||
public void testHashedWheelTimer() throws Exception {
|
||||
|
||||
HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
|
||||
List<TimerFuture> futures = Lists.newLinkedList();
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
|
||||
String name = "Task" + i;
|
||||
long nowMS = System.currentTimeMillis();
|
||||
int delayMS = ThreadLocalRandom.current().nextInt(60000);
|
||||
long targetTime = delayMS + nowMS;
|
||||
|
||||
TimerTask timerTask = () -> {
|
||||
System.out.println("============= " + name + "============= ");
|
||||
System.out.println("ThreadInfo:" + Thread.currentThread().getName());
|
||||
System.out.println("expectTime:" + targetTime);;
|
||||
System.out.println("currentTime:" + System.currentTimeMillis());
|
||||
System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
|
||||
System.out.println("============= " + name + "============= ");
|
||||
};
|
||||
futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
// 随机取消
|
||||
futures.forEach(future -> {
|
||||
|
||||
int x = ThreadLocalRandom.current().nextInt(2);
|
||||
if (x == 1) {
|
||||
future.cancel();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
// 关闭
|
||||
System.out.println(timer.stop().size());
|
||||
System.out.println("Finished!");
|
||||
|
||||
Thread.sleep(277777777);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCronExpression() throws Exception {
|
||||
String cron = "0 * * * * ? *";
|
||||
|
19
powerjob-server/src/test/resources/logback-test.xml
Normal file
19
powerjob-server/src/test/resources/logback-test.xml
Normal file
@ -0,0 +1,19 @@
|
||||
<?xml version="1.0"?>
|
||||
<!-- 生产环境日志 -->
|
||||
<configuration>
|
||||
|
||||
<property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %m%n"/>
|
||||
|
||||
<!-- Console 输出设置 -->
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
|
||||
<charset>utf8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
</root>
|
||||
|
||||
</configuration>
|
@ -10,12 +10,12 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker-agent</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<properties>
|
||||
<powerjob.worker.version>3.3.3</powerjob.worker.version>
|
||||
<powerjob.worker.version>3.4.0</powerjob.worker.version>
|
||||
<logback.version>1.2.3</logback.version>
|
||||
<picocli.version>4.3.2</picocli.version>
|
||||
|
||||
|
@ -10,11 +10,11 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>powerjob-worker-samples</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
|
||||
<properties>
|
||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||
<powerjob.worker.starter.version>3.3.3</powerjob.worker.starter.version>
|
||||
<powerjob.worker.starter.version>3.4.0</powerjob.worker.starter.version>
|
||||
<fastjson.version>1.2.68</fastjson.version>
|
||||
|
||||
<!-- 部署时跳过该module -->
|
||||
|
@ -10,11 +10,11 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<powerjob.worker.version>3.3.3</powerjob.worker.version>
|
||||
<powerjob.worker.version>3.4.0</powerjob.worker.version>
|
||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||
</properties>
|
||||
|
||||
|
@ -10,12 +10,12 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<spring.version>5.2.4.RELEASE</spring.version>
|
||||
<powerjob.common.version>3.3.3</powerjob.common.version>
|
||||
<powerjob.common.version>3.4.0</powerjob.common.version>
|
||||
<h2.db.version>1.4.200</h2.db.version>
|
||||
<hikaricp.version>3.4.2</hikaricp.version>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
|
Loading…
x
Reference in New Issue
Block a user