Merge branch 'v3.1.3' into jenkins_auto_build

This commit is contained in:
tjq 2020-07-01 20:36:05 +08:00
commit 65e447a2e4
75 changed files with 790 additions and 462 deletions

View File

@ -22,11 +22,21 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
[在线试用地址](https://www.yuque.com/powerjob/guidence/hnbskn)
### 适用场景
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景比如需要更新一大批数据单机执行耗时非常长可以使用Map/MapReduce处理器完成任务的分发调动整个集群加速计算。
* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。
### 设计目标
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
### 在线试用
试用地址:[try.powerjob.tech](http://try.powerjob.tech/)
试用应用名称powerjob-agent-test
控制台密码123
[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
@ -43,7 +53,9 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
# 文档
**[超详细中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** OR **[备用地址(内容可能更新不及时)](https://kfcfans.github.io/)**
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!

View File

@ -1,13 +1,15 @@
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/OhMyScheduler/master/others/images/oms-logo.png" alt="OhMyScheduler" title="OhMyScheduler" width="557"/>
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>
<p align="center">
<a href="https://github.com/KFCFans/OhMyScheduler/actions"><img src="https://github.com/KFCFans/OhMyScheduler/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master"></a>
<a href="https://github.com/KFCFans/OhMyScheduler/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/OhMyScheduler"></a>
<a href="https://github.com/KFCFans/PowerJob/actions"><img src="https://github.com/KFCFans/PowerJob/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master" alt="actions"></a>
<a href="https://search.maven.org/search?q=com.github.kfcfans"><img alt="Maven Central" src="https://img.shields.io/maven-central/v/com.github.kfcfans/powerjob-worker"></a>
<a href="https://github.com/KFCFans/PowerJob/releases"><img alt="GitHub release (latest SemVer)" src="https://img.shields.io/github/v/release/kfcfans/powerjob?color=%23E59866"></a>
<a href="https://github.com/KFCFans/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
</p>
OhMyScheduler is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.
PowerJob is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.
# Introduction
@ -30,7 +32,7 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp
### Comparison of similar products
| | QuartZ | xxl-job | SchedulerX 2.0 | OhMyScheduler |
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** |
| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** |
@ -43,9 +45,9 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp
| workflow | not support | not support | support | **support** |
# Document
**[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)**
**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)**
**[中文文档](https://www.yuque.com/ohmyscheduler/product)**
**[中文文档](https://www.yuque.com/powerjob/product)**
# Others

View File

@ -2,14 +2,14 @@
Navicat Premium Data Transfer
Source Server Type : MySQL
Source Server Version : 50724
Source Schema : oms-product
Source Server Version : 80020
Source Schema : powerjob-product
Target Server Type : MySQL
Target Server Version : 50724
Target Server Version : 80020
File Encoding : 65001
Date: 07/06/2020 11:11:47
Date: 23/06/2020 22:30:06
*/
SET NAMES utf8mb4;
@ -20,130 +20,131 @@ SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
DROP TABLE IF EXISTS `app_info`;
CREATE TABLE `app_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`id` bigint NOT NULL AUTO_INCREMENT,
`app_name` varchar(255) DEFAULT NULL,
`current_server` varchar(255) DEFAULT NULL,
`description` varchar(255) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`password` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `appNameUK` (`app_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for container_info
-- ----------------------------
DROP TABLE IF EXISTS `container_info`;
CREATE TABLE `container_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`app_id` bigint(20) DEFAULT NULL,
`id` bigint NOT NULL AUTO_INCREMENT,
`app_id` bigint DEFAULT NULL,
`container_name` varchar(255) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`last_deploy_time` datetime(6) DEFAULT NULL,
`source_info` varchar(255) DEFAULT NULL,
`source_type` int(11) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`source_type` int DEFAULT NULL,
`status` int DEFAULT NULL,
`version` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDX8hixyaktlnwil2w9up6b0p898` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for instance_info
-- ----------------------------
DROP TABLE IF EXISTS `instance_info`;
CREATE TABLE `instance_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint(20) DEFAULT NULL,
`app_id` bigint(20) DEFAULT NULL,
`expected_trigger_time` bigint(20) DEFAULT NULL,
`finished_time` bigint(20) DEFAULT NULL,
`id` bigint NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint DEFAULT NULL,
`app_id` bigint DEFAULT NULL,
`expected_trigger_time` bigint DEFAULT NULL,
`finished_time` bigint DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`instance_id` bigint(20) DEFAULT NULL,
`instance_id` bigint DEFAULT NULL,
`instance_params` text,
`job_id` bigint(20) DEFAULT NULL,
`job_id` bigint DEFAULT NULL,
`last_report_time` bigint DEFAULT NULL,
`result` text,
`running_times` bigint(20) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`running_times` bigint DEFAULT NULL,
`status` int DEFAULT NULL,
`task_tracker_address` varchar(255) DEFAULT NULL,
`type` int(11) DEFAULT NULL,
`wf_instance_id` bigint(20) DEFAULT NULL,
`type` int DEFAULT NULL,
`wf_instance_id` bigint DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`),
KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`),
KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for job_info
-- ----------------------------
DROP TABLE IF EXISTS `job_info`;
CREATE TABLE `job_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`app_id` bigint(20) DEFAULT NULL,
`concurrency` int(11) DEFAULT NULL,
`id` bigint NOT NULL AUTO_INCREMENT,
`app_id` bigint DEFAULT NULL,
`concurrency` int DEFAULT NULL,
`designated_workers` varchar(255) DEFAULT NULL,
`execute_type` int(11) DEFAULT NULL,
`execute_type` int DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`instance_retry_num` int(11) DEFAULT NULL,
`instance_time_limit` bigint(20) DEFAULT NULL,
`instance_retry_num` int DEFAULT NULL,
`instance_time_limit` bigint DEFAULT NULL,
`job_description` varchar(255) DEFAULT NULL,
`job_name` varchar(255) DEFAULT NULL,
`job_params` varchar(255) DEFAULT NULL,
`max_instance_num` int(11) DEFAULT NULL,
`max_worker_count` int(11) DEFAULT NULL,
`max_instance_num` int DEFAULT NULL,
`max_worker_count` int DEFAULT NULL,
`min_cpu_cores` double NOT NULL,
`min_disk_space` double NOT NULL,
`min_memory_space` double NOT NULL,
`next_trigger_time` bigint(20) DEFAULT NULL,
`next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL,
`processor_info` text,
`processor_type` int(11) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`task_retry_num` int(11) DEFAULT NULL,
`processor_type` int DEFAULT NULL,
`status` int DEFAULT NULL,
`task_retry_num` int DEFAULT NULL,
`time_expression` varchar(255) DEFAULT NULL,
`time_expression_type` int(11) DEFAULT NULL,
`time_expression_type` int DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for oms_lock
-- ----------------------------
DROP TABLE IF EXISTS `oms_lock`;
CREATE TABLE `oms_lock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`id` bigint NOT NULL AUTO_INCREMENT,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`lock_name` varchar(255) DEFAULT NULL,
`max_lock_time` bigint(20) DEFAULT NULL,
`max_lock_time` bigint DEFAULT NULL,
`ownerip` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lockNameUK` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for server_info
-- ----------------------------
DROP TABLE IF EXISTS `server_info`;
CREATE TABLE `server_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`id` bigint NOT NULL AUTO_INCREMENT,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`ip` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `UKtk8ytgpl7mpukhnvhbl82kgvy` (`ip`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for user_info
-- ----------------------------
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`id` bigint NOT NULL AUTO_INCREMENT,
`email` varchar(255) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
@ -151,47 +152,47 @@ CREATE TABLE `user_info` (
`phone` varchar(255) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for workflow_info
-- ----------------------------
DROP TABLE IF EXISTS `workflow_info`;
CREATE TABLE `workflow_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`app_id` bigint(20) DEFAULT NULL,
`id` bigint NOT NULL AUTO_INCREMENT,
`app_id` bigint DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`max_wf_instance_num` int(11) DEFAULT NULL,
`next_trigger_time` bigint(20) DEFAULT NULL,
`max_wf_instance_num` int DEFAULT NULL,
`next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL,
`pedag` text,
`status` int(11) DEFAULT NULL,
`status` int DEFAULT NULL,
`time_expression` varchar(255) DEFAULT NULL,
`time_expression_type` int(11) DEFAULT NULL,
`time_expression_type` int DEFAULT NULL,
`wf_description` varchar(255) DEFAULT NULL,
`wf_name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for workflow_instance_info
-- ----------------------------
DROP TABLE IF EXISTS `workflow_instance_info`;
CREATE TABLE `workflow_instance_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint(20) DEFAULT NULL,
`app_id` bigint(20) DEFAULT NULL,
`id` bigint NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint DEFAULT NULL,
`app_id` bigint DEFAULT NULL,
`dag` text,
`finished_time` bigint(20) DEFAULT NULL,
`finished_time` bigint DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`result` text,
`status` int(11) DEFAULT NULL,
`wf_instance_id` bigint(20) DEFAULT NULL,
`workflow_id` bigint(20) DEFAULT NULL,
`status` int DEFAULT NULL,
`wf_instance_id` bigint DEFAULT NULL,
`workflow_id` bigint DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SET FOREIGN_KEY_CHECKS = 1;

View File

@ -62,8 +62,10 @@ if [ "$startup" = "y" ] || [ "$startup" = "Y" ]; then
echo "================== 准备启动 powerjob-server =================="
docker run -d \
--name powerjob-server \
-p 7700:7700 -p 10086:10086 \
-p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--spring.profiles.active=pre" \
-e TZ="Asia/Shanghai" \
-v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \
tjqq/powerjob-server:$version
sleep 1
@ -74,8 +76,21 @@ if [ "$startup" = "y" ] || [ "$startup" = "Y" ]; then
serverIP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' powerjob-server)
serverAddress="$serverIP:7700"
echo "使用的Server地址$serverAddress"
docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27777:27777 --name powerjob-agent -v ~/docker/powerjob-agent:/root tjqq/powerjob-agent:$version
docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27778:27777 --name powerjob-agent2 -v ~/docker/powerjob-agent2:/root tjqq/powerjob-agent:$version
docker run -d \
--name powerjob-agent \
-p 27777:27777 -p 5002:5005 -p 10002:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
-v ~/docker/powerjob-agent:/root \
tjqq/powerjob-agent:$version
docker run -d \
--name powerjob-agent2 \
-p 27778:27777 -p 5003:5005 -p 10003:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
-v ~/docker/powerjob-agent2:/root \
tjqq/powerjob-agent:$version
tail -f -n 100 ~/docker/powerjob-agent/powerjob/logs/powerjob-agent-application.log
fi

View File

@ -1,12 +0,0 @@
#!/bin/sh
# 一键部署脚本,请勿挪动脚本
cd `dirname $0`/../.. || exit
echo "================== 构建 jar =================="
mvn clean package -DskipTests -Pdev -e -U
echo "================== 拷贝 jar =================="
/bin/cp -rf powerjob-server/target/*.jar others/powerjob-server.jar
ls -l others/powerjob-server.jar
echo "================== debug 模式启动 =================="
nohup java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -jar others/powerjob-server.jar > powerjob-server.log &
sleep 100
tail --pid=$$ -f -n 1000 others/powerjob-server.log

View File

@ -1,37 +0,0 @@
#!/bin/bash
cd `dirname $0`/../.. || exit
echo "================== 构建 jar =================="
mvn clean package -Pdev -DskipTests -U -e -pl powerjob-server,powerjob-worker-agent -am
echo "================== 拷贝 jar =================="
/bin/cp -rf powerjob-server/target/*.jar powerjob-server/docker/powerjob-server.jar
/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar
echo "================== 关闭老应用 =================="
docker stop powerjob-server
docker stop powerjob-agent
docker stop powerjob-agent2
echo "================== 删除老容器 =================="
docker container rm powerjob-server
docker container rm powerjob-agent
docker container rm powerjob-agent2
echo "================== 删除旧镜像 =================="
docker rmi -f tjqq/powerjob-server:latest
docker rmi -f tjqq/powerjob-agent:latest
echo "================== 构建 powerjob-server 镜像 =================="
docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit
echo "================== 构建 powerjob-agent 镜像 =================="
docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit
echo "================== 准备启动 powerjob-server =================="
docker run -d \
--name powerjob-server \
-p 7700:7700 -p 10086:10086 \
-e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://172.27.147.252:3306/oms-product?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://172.27.147.252:27017/oms-product" \
-v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \
tjqq/powerjob-server:latest
sleep 60
echo "================== 准备启动 powerjob-client =================="
serverIP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' powerjob-server)
serverAddress="$serverIP:7700"
echo "使用的Server地址$serverAddress"
docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27777:27777 --name powerjob-agent -v ~/docker/powerjob-agent:/root tjqq/powerjob-agent:latest
docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27778:27777 --name powerjob-agent2 -v ~/docker/powerjob-agent2:/root tjqq/powerjob-agent:latest

View File

@ -0,0 +1,55 @@
#!/bin/bash
cd `dirname $0`/../.. || exit
echo "================== 构建 jar =================="
mvn clean package -Pdev -DskipTests -U -e -pl powerjob-server,powerjob-worker-agent -am
echo "================== 拷贝 jar =================="
/bin/cp -rf powerjob-server/target/*.jar powerjob-server/docker/powerjob-server.jar
/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar
echo "================== 关闭老应用 =================="
docker stop powerjob-server
docker stop powerjob-agent
docker stop powerjob-agent2
echo "================== 删除老容器 =================="
docker container rm powerjob-server
docker container rm powerjob-agent
docker container rm powerjob-agent2
echo "================== 删除旧镜像 =================="
docker rmi -f tjqq/powerjob-server:latest
docker rmi -f tjqq/powerjob-agent:latest
echo "================== 构建 powerjob-server 镜像 =================="
docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit
echo "================== 构建 powerjob-agent 镜像 =================="
docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit
echo "================== 准备启动 powerjob-server =================="
docker run -d \
--restart=always \
--name powerjob-server \
-p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-product" \
-v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \
tjqq/powerjob-server:latest
sleep 60
echo "================== 准备启动 powerjob-agent =================="
serverIP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' powerjob-server)
serverAddress="$serverIP:7700"
echo "使用的Server地址$serverAddress"
docker run -d \
--restart=always \
--name powerjob-agent \
-p 27777:27777 -p 5002:5005 -p 10002:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
-v ~/docker/powerjob-agent:/root \
tjqq/powerjob-agent:latest
docker run -d \
--restart=always \
--name powerjob-agent2 \
-p 27778:27777 -p 5003:5005 -p 10003:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
-v ~/docker/powerjob-agent2:/root \
tjqq/powerjob-agent:latest

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.1.0</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.1.0</powerjob.common.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -66,9 +66,11 @@ public class OhMyClient {
appId = Long.parseLong(resultDTO.getData().toString());
currentAddress = addr;
break;
}else {
throw new OmsException(resultDTO.getMessage());
}
}
}catch (Exception ignore) {
}catch (IOException ignore) {
}
}
@ -173,13 +175,15 @@ public class OhMyClient {
* 运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例的参数
* @param delayMS 延迟时间单位毫秒
* @return 任务实例IDinstanceId
* @throws Exception 异常
*/
public ResultDTO<Long> runJob(Long jobId, String instanceParams) throws Exception {
public ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS) throws Exception {
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString());
.add("appId", appId.toString())
.add("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(instanceParams)) {
builder.add("instanceParams", instanceParams);
@ -188,7 +192,7 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
return runJob(jobId, null);
return runJob(jobId, null, 0);
}
/* ************* Instance 区 ************* */

View File

@ -21,7 +21,7 @@ public class TestClient {
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null);
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test
@ -70,7 +70,7 @@ public class TestClient {
@Test
public void testRunJob() throws Exception {
System.out.println(ohMyClient.runJob(8L, "this is instanceParams"));
System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000));
}
@Test

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.1.0</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>

View File

@ -43,7 +43,7 @@ public class InstanceDetail implements OmsSerializable {
private String startTime;
private String finishedTime;
private String result;
private String status;
private int status;
}
// MapReduce Broadcast 任务的 extra ->

View File

@ -15,5 +15,5 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class ServerDestroyContainerRequest implements OmsSerializable {
private String containerName;
private Long containerId;
}

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
import java.util.List;
@ -76,4 +77,14 @@ public class SaveJobInfoRequest {
// 报警用户ID列表
private List<Long> notifyUserIds;
public void valid() {
CommonUtils.requireNonNull(jobName, "jobName can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");
CommonUtils.requireNonNull(processorInfo, "processorInfo can't be empty");
CommonUtils.requireNonNull(executeType, "executeType can't be empty");
CommonUtils.requireNonNull(processorType, "processorType can't be empty");
CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
@ -43,4 +44,11 @@ public class SaveWorkflowRequest {
// 工作流整体失败的报警
private List<Long> notifyUserIds = Lists.newLinkedList();
public void valid() {
CommonUtils.requireNonNull(wfName, "workflow name can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");
CommonUtils.requireNonNull(pEWorkflowDAG, "dag can't be empty");
CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
}
}

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.common.utils;
import com.github.kfcfans.powerjob.common.OmsException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
@ -114,4 +117,16 @@ public class CommonUtils {
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
public static <T> T requireNonNull(T obj, String msg) {
if (obj == null) {
throw new OmsException(msg);
}
if (obj instanceof String) {
if (StringUtils.isEmpty((String) obj)) {
throw new OmsException(msg);
}
}
return obj;
}
}

View File

@ -1,5 +1,5 @@
# 基础镜像
FROM openjdk:8
# 基础镜像(支持 amd64 & arm64based on Ubuntu 18.04.4 LTS
FROM adoptopenjdk:8-jdk-hotspot
# 维护者
MAINTAINER tengjiqi@gmail.com
# 下载并安装 maven
@ -11,13 +11,14 @@ COPY settings.xml /opt/powerjob-maven/conf/settings.xml
# 设置 maven 环境变量maven invoker 读取该变量调用 maven
ENV M2_HOME=/opt/powerjob-maven
# 设置时区Debian专用方法
# 设置时区
ENV TZ=Asia/Shanghai
# 设置其他环境变量
ENV APP_NAME=powerjob-server
# 传递 SpringBoot 启动参数
# 传递 SpringBoot 启动参数 和 JVM参数
ENV PARAMS=""
ENV JVMOPTIONS=""
# 将应用 jar 包拷入 docker
COPY powerjob-server.jar /powerjob-server.jar
# 暴露端口HTTP + AKKA
@ -27,4 +28,4 @@ RUN mkdir -p /root/powerjob-server
# 挂载数据卷,将文件直接输出到宿主机(注意,此处挂载的是匿名卷,即在宿主机位置随机)
VOLUME /root/powerjob-server
# 启动应用
ENTRYPOINT ["sh","-c","java -jar /powerjob-server.jar $PARAMS"]
ENTRYPOINT ["sh","-c","java $JVMOPTIONS -jar /powerjob-server.jar $PARAMS"]

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.1.0</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.1.0</powerjob.common.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerReque
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@ -91,8 +92,10 @@ public class ServerActor extends AbstractActor {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(false);
if (containerInfoOpt.isPresent()) {
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
askResponse.setSuccess(false);
askResponse.setMessage("can't find container by id: " + req.getContainerId());
}else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
@ -104,7 +107,6 @@ public class ServerActor extends AbstractActor {
askResponse.setData(JsonUtils.toBytes(dpReq));
}
getSender().tell(askResponse, getSelf());
}

View File

@ -1,31 +0,0 @@
package com.github.kfcfans.powerjob.server.common.constans;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 容器状态
* 由于命名约束准备采取硬删除策略
*
* @author tjq
* @since 2020/5/15
*/
@Getter
@AllArgsConstructor
public enum ContainerStatus {
ENABLE(1),
DISABLE(2),
DELETED(99);
private int v;
public static ContainerStatus of(int v) {
for (ContainerStatus type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown ContainerStatus of " + v);
}
}

View File

@ -1,5 +1,19 @@
package com.github.kfcfans.powerjob.server.common.utils;
/*
Copyright [2020] [KFCFans]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import java.io.Serializable;
import java.text.ParseException;
import java.util.Calendar;

View File

@ -37,8 +37,6 @@ public class HashedWheelTimer implements Timer {
private final ExecutorService taskProcessPool;
private static final int MAXIMUM_CAPACITY = 1 << 30;
public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
this(tickDuration, ticksPerWheel, 0);
}
@ -47,9 +45,9 @@ public class HashedWheelTimer implements Timer {
* 新建时间轮定时器
* @param tickDuration 时间间隔单位毫秒ms
* @param ticksPerWheel 轮盘个数
* @param processTaskNum 处理任务的线程个数0代表不启用新线程如果定时任务需要耗时操作请启用线程池
* @param processThreadNum 处理任务的线程个数0代表不启用新线程如果定时任务需要耗时操作请启用线程池
*/
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processTaskNum) {
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
@ -62,12 +60,13 @@ public class HashedWheelTimer implements Timer {
mask = wheel.length - 1;
// 初始化执行线程池
if (processTaskNum <= 0) {
if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
taskProcessPool = new ThreadPoolExecutor(2, processTaskNum,
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
/**
* description
* TimerFuture
*
* @author tjq
* @since 2020/4/3

View File

@ -13,5 +13,5 @@ import java.util.List;
*/
public interface ContainerInfoRepository extends JpaRepository<ContainerInfoDO, Long> {
List<ContainerInfoDO> findByAppId(Long appId);
List<ContainerInfoDO> findByAppIdAndStatusNot(Long appId, Integer status);
}

View File

@ -68,7 +68,7 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
List<Long> findByJobIdInAndStatusIn(List<Long> jobIds, List<Integer> status);
// 删除历史数据JPA自带的删除居然是根据ID循环删2000条数据删了几秒也太拉垮了吧...
// 结果只能用 int
// 结果只能用 int
@Modifying
@Transactional
@Query(value = "delete from instance_info where gmt_modified < ?1", nativeQuery = true)

View File

@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@ -85,6 +86,9 @@ public class ContainerService {
* @param request 容器保存请求
*/
public void save(SaveContainerInfoRequest request) {
request.valid();
ContainerInfoDO container;
Long originId = request.getId();
if (originId != null) {
@ -120,15 +124,17 @@ public class ContainerService {
throw new RuntimeException("Permission Denied!");
}
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getContainerName());
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> {
ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
workerActor.tell(destroyRequest, null);
});
log.info("[ContainerService] delete container: {}.", container);
// 硬删除算了...留着好像也没什么用
containerInfoRepository.deleteById(containerId);
// 软删除
container.setStatus(SwitchableStatus.DELETED.getV());
container.setGmtModified(new Date());
containerInfoRepository.saveAndFlush(container);
}
/**

View File

@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -39,6 +40,8 @@ public class DispatchService {
@Resource
private InstanceManager instanceManager;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private static final Splitter commaSplitter = Splitter.on(",");
@ -142,5 +145,8 @@ public class DispatchService {
// 修改状态
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams, now);
// 装载缓存
instanceMetadataService.loadJobInfo(instanceId, jobInfo);
}
}

View File

@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogDO;
import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -46,7 +46,7 @@ import java.util.stream.Stream;
public class InstanceLogService {
@Resource
private InstanceManager instanceManager;
private InstanceMetadataService instanceMetadataService;
@Resource
private GridFsManager gridFsManager;
// 本地数据库操作bean
@ -320,13 +320,12 @@ public class InstanceLogService {
// 定时删除秒级任务的日志
List<Long> frequentInstanceIds = Lists.newLinkedList();
instanceId2LastReportTime.keySet().forEach(instanceId -> {
JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId);
if (jobInfo == null) {
return;
}
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
frequentInstanceIds.add(instanceId);
try {
JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
frequentInstanceIds.add(instanceId);
}
}catch (Exception ignore) {
}
});

View File

@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -21,6 +22,7 @@ import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 任务服务
@ -50,6 +52,8 @@ public class JobService {
*/
public Long saveJob(SaveJobInfoRequest request) throws Exception {
request.valid();
JobInfoDO jobInfoDO;
if (request.getId() != null) {
jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
@ -94,16 +98,22 @@ public class JobService {
* 手动立即运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例参数 OpenAPI 存在
* @param delay 延迟时间单位 毫秒
* @return 任务实例ID
*/
public long runJob(Long jobId, String instanceParams) {
public long runJob(Long jobId, String instanceParams, long delay) {
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis());
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}else {
HashedWheelTimerHolder.TIMER.schedule(() -> {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}, delay, TimeUnit.MILLISECONDS);
}
return instanceId;
}

View File

@ -65,7 +65,7 @@ public class DefaultMailAlarmService implements Alarmable {
javaMailSender.send(sm);
}catch (Exception e) {
log.error("[OmsMailAlarmService] send mail({}) failed.", sm, e);
log.error("[OmsMailAlarmService] send mail({}) failed, reason is {}", sm, e.getMessage());
}
}
}

View File

@ -30,7 +30,7 @@ public class ClusterStatusHolder {
// 集群中所有机器的最后心跳时间
private Map<String, Long> address2ActiveTime;
private static final long WORKER_TIMEOUT_MS = 30000;
private static final long WORKER_TIMEOUT_MS = 60000;
public ClusterStatusHolder(String appName) {
this.appName = appName;
@ -49,7 +49,7 @@ public class ClusterStatusHolder {
Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L);
if (heartbeatTime < oldTime) {
log.warn("[ClusterStatusHolder-{}] receive the old heartbeat: {}.", appName, heartbeat);
log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
return;
}
@ -131,10 +131,25 @@ public class ClusterStatusHolder {
/**
* 释放所有本地存储的容器信息该操作会导致短暂的 listDeployedContainer 服务不可用
*/
public void releaseContainerInfos() {
public void release() {
log.info("[ClusterStatusHolder-{}] clean the containerInfos, listDeployedContainer service may down about 1min~", appName);
// 丢弃原来的所有数据准备重建
containerId2Infos = Maps.newConcurrentMap();
// 丢弃超时机器的信息
List<String> timeoutAddress = Lists.newLinkedList();
address2Metrics.forEach((addr, lastActiveTime) -> {
if (timeout(addr)) {
timeoutAddress.add(addr);
}
});
if (!timeoutAddress.isEmpty()) {
log.info("[ClusterStatusHolder-{}] detective timeout workers({}), try to release their infos.", appName, timeoutAddress);
timeoutAddress.forEach(addr -> {
address2ActiveTime.remove(addr);
address2Metrics.remove(addr);
});
}
}
private boolean timeout(String address) {

View File

@ -43,7 +43,7 @@ public class ServerSelectService {
/**
* 获取某个应用对应的Server
* 缺点如果server死而复生可能造成worker集群脑裂不过感觉影响不是很大 & 概率极低就不管了
*
* @param appId 应用ID
* @return 当前可用的Server
*/

View File

@ -94,9 +94,9 @@ public class WorkerManagerService {
}
/**
* 释放所有本地存储的容器信息该操作会导致短暂的 listDeployedContainer 服务不可用
* 清理缓存信息防止 OOM
*/
public static void releaseContainerInfos() {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::releaseContainerInfos);
public static void cleanUp() {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::release);
}
}

View File

@ -8,7 +8,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.UserService;
@ -16,8 +15,6 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -25,7 +22,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@ -38,10 +34,6 @@ import java.util.concurrent.TimeUnit;
@Service
public class InstanceManager {
// 存储 instanceId 对应的 Job 信息便于重试
private static Cache<Long, JobInfoDO> instanceId2JobInfo;
// Spring Bean
@Resource
private DispatchService dispatchService;
@Resource
@ -49,21 +41,12 @@ public class InstanceManager {
@Resource(name = "omsCenterAlarmService")
private Alarmable omsCenterAlarmService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private InstanceMetadataService instanceMetadataService;
@Resource
private JobInfoRepository jobInfoRepository;
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
private static final int CACHE_CONCURRENCY_LEVEL = 8;
private static final int CACHE_MAX_SIZE = 4096;
static {
instanceId2JobInfo = CacheBuilder.newBuilder()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.maximumSize(CACHE_MAX_SIZE)
.build();
}
/**
* 更新任务状态
@ -71,19 +54,18 @@ public class InstanceManager {
*/
public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception {
Long jobId = req.getJobId();
Long instanceId = req.getInstanceId();
// 获取相关数据
JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId));
});
JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId());
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) {
log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId);
return;
}
// 丢弃过期的上报数据
if (req.getReportTime() <= instanceInfo.getLastReportTime()) {
log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will br dropped.", instanceId, req);
log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will be dropped.", instanceId, req);
return;
}
@ -105,6 +87,7 @@ public class InstanceManager {
// 综上直接把 status runningNum 同步到DB即可
if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
instanceInfo.setResult(req.getResult());
instanceInfo.setRunningTimes(req.getTotalTaskNum());
instanceInfoRepository.saveAndFlush(instanceInfo);
return;
@ -170,9 +153,11 @@ public class InstanceManager {
// 告警
if (status == InstanceStatus.FAILED) {
JobInfoDO jobInfo = fetchJobInfo(instanceId);
if (jobInfo == null) {
log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId);
JobInfoDO jobInfo;
try {
jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
}catch (Exception e) {
log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId);
return;
}
@ -185,31 +170,8 @@ public class InstanceManager {
omsCenterAlarmService.onJobInstanceFailed(content, userList);
}
// 过期缓存
instanceId2JobInfo.invalidate(instanceId);
// 主动移除缓存减小内存占用
instanceMetadataService.invalidateJobInfo(instanceId);
}
/**
* 根据任务实例ID查询任务相关信息
* @param instanceId 任务实例ID
* @return 任务元数据
*/
public JobInfoDO fetchJobInfo(Long instanceId) {
JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId);
if (jobInfo != null) {
return jobInfo;
}
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo != null) {
return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null);
}
return null;
}
/**
* 释放本地缓存防止内存泄漏
*/
public static void releaseCache() {
instanceId2JobInfo.cleanUp();
}
}

View File

@ -0,0 +1,80 @@
package com.github.kfcfans.powerjob.server.service.instance;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* 存储 instance 对应的 JobInfo 信息
*
* @author tjq
* @since 2020/6/23
*/
@Service
public class InstanceMetadataService implements InitializingBean {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
// 缓存一旦生成任务实例其对应的 JobInfo 不应该再改变即使源数据改变
private Cache<Long, JobInfoDO> instanceId2JobInfoCache;
@Value("${oms.instance.metadata.cache.size}")
private int instanceMetadataCacheSize;
private static final int CACHE_CONCURRENCY_LEVEL = 4;
@Override
public void afterPropertiesSet() throws Exception {
instanceId2JobInfoCache = CacheBuilder.newBuilder()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.maximumSize(instanceMetadataCacheSize)
.build();
}
/**
* 根据 instanceId 获取 JobInfo
* @param instanceId instanceId
* @return JobInfoDO
* @throws ExecutionException 异常
*/
public JobInfoDO fetchJobInfoByInstanceId(Long instanceId) throws ExecutionException {
return instanceId2JobInfoCache.get(instanceId, () -> {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo != null) {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(instanceInfo.getJobId());
return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobInfo by jobId: " + instanceInfo.getJobId()));
}
throw new IllegalArgumentException("can't find Instance by instanceId: " + instanceId);
});
}
/**
* 装载缓存
* @param instanceId instanceId
* @param jobInfoDO 原始的任务数据
*/
public void loadJobInfo(Long instanceId, JobInfoDO jobInfoDO) {
instanceId2JobInfoCache.put(instanceId, jobInfoDO);
}
/**
* 失效缓存
* @param instanceId instanceId
*/
public void invalidateJobInfo(Long instanceId) {
instanceId2JobInfoCache.invalidate(instanceId);
}
}

View File

@ -5,7 +5,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
@ -59,8 +58,7 @@ public class CleanService {
public void timingClean() {
// 释放本地缓存
WorkerManagerService.releaseContainerInfos();
InstanceManager.releaseCache();
WorkerManagerService.cleanUp();
// 删除数据库运行记录
cleanInstanceLog();

View File

@ -258,7 +258,7 @@ public class OmsScheduleService {
}
log.info("[FrequentScheduler] These frequent jobs will be scheduled {}.", notRunningJobIds);
notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null));
notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null, 0));
}catch (Exception e) {
log.error("[FrequentScheduler] schedule frequent job failed.", e);
}

View File

@ -39,6 +39,8 @@ public class WorkflowService {
*/
public Long saveWorkflow(SaveWorkflowRequest req) throws Exception {
req.valid();
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
throw new OmsException("illegal DAG");
}

View File

@ -4,7 +4,7 @@ import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.ContainerTemplateGenerator;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
@ -87,7 +87,8 @@ public class ContainerController {
@GetMapping("/list")
public ResultDTO<List<ContainerInfoVO>> listContainers(Long appId) {
List<ContainerInfoVO> res = containerInfoRepository.findByAppId(appId).stream().map(ContainerController::convert).collect(Collectors.toList());
List<ContainerInfoVO> res = containerInfoRepository.findByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())
.stream().map(ContainerController::convert).collect(Collectors.toList());
return ResultDTO.success(res);
}
@ -122,7 +123,7 @@ public class ContainerController {
}else {
vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN));
}
ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus());
SwitchableStatus status = SwitchableStatus.of(containerInfoDO.getStatus());
vo.setStatus(status.name());
ContainerSourceType sourceType = ContainerSourceType.of(containerInfoDO.getSourceType());
vo.setSourceType(sourceType.name());

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
@ -21,6 +22,7 @@ import org.springframework.data.domain.Example;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@ -112,6 +114,10 @@ public class InstanceController {
BeanUtils.copyProperties(request, queryEntity);
queryEntity.setType(request.getType().getV());
if (!StringUtils.isEmpty(request.getStatus())) {
queryEntity.setStatus(InstanceStatus.valueOf(request.getStatus()).getV());
}
Page<InstanceInfoDO> pageResult = instanceInfoRepository.findAll(Example.of(queryEntity), pageable);
return ResultDTO.success(convertPage(pageResult));
}

View File

@ -63,7 +63,7 @@ public class JobController {
@GetMapping("/run")
public ResultDTO<Long> runImmediately(String jobId) {
return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null));
return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null, 0));
}
@PostMapping("/list")

View File

@ -80,9 +80,9 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.RUN_JOB)
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams) {
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) {
checkJobIdValid(jobId, appId);
return ResultDTO.success(jobService.runJob(jobId, instanceParams));
return ResultDTO.success(jobService.runJob(jobId, instanceParams, delay == null ? 0 : delay));
}
/* ************* Instance 区 ************* */

View File

@ -46,4 +46,9 @@ public class ServerController {
return ResultDTO.success(server);
}
@GetMapping("/hello")
public ResultDTO<String> ping() {
return ResultDTO.success("this is powerjob-server~");
}
}

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.web.controller;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.response.AskResponse;
@ -18,6 +19,7 @@ import com.github.kfcfans.powerjob.server.web.response.SystemOverviewVO;
import com.github.kfcfans.powerjob.server.web.response.WorkerStatusVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
@ -57,7 +59,7 @@ public class SystemInfoController {
}
String server =appInfoOpt.get().getCurrentServer();
// 没有Server
// 没有 Server说明从来没有该 appId worker 集群连接过
if (StringUtils.isEmpty(server)) {
return ResultDTO.success(Collections.emptyList());
}
@ -103,8 +105,10 @@ public class SystemInfoController {
Date date = DateUtils.addDays(new Date(), -1);
overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
// 服务器时区
overview.setTimezone(TimeZone.getDefault().getDisplayName());
// 服务器时间
overview.setServerTime(System.currentTimeMillis());
overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));
return ResultDTO.success(overview);
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.persistence.PageResult;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO;
@ -13,6 +14,7 @@ import org.springframework.data.domain.Example;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@ -54,6 +56,11 @@ public class WorkflowInstanceController {
WorkflowInstanceInfoDO queryEntity = new WorkflowInstanceInfoDO();
BeanUtils.copyProperties(req, queryEntity);
if (!StringUtils.isEmpty(req.getStatus())) {
queryEntity.setStatus(WorkflowInstanceStatus.valueOf(req.getStatus()).getV());
}
Page<WorkflowInstanceInfoDO> ps = workflowInstanceInfoRepository.findAll(Example.of(queryEntity), pageable);
return ResultDTO.success(convertPage(ps));

View File

@ -24,4 +24,6 @@ public class QueryInstanceRequest {
private Long instanceId;
private Long jobId;
private Long wfInstanceId;
private String status;
}

View File

@ -21,4 +21,6 @@ public class QueryWorkflowInstanceRequest {
// 查询条件NORMAL/WORKFLOW
private Long wfInstanceId;
private Long workflowId;
private String status;
}

View File

@ -1,7 +1,8 @@
package com.github.kfcfans.powerjob.server.web.request;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import lombok.Data;
/**
@ -28,5 +29,11 @@ public class SaveContainerInfoRequest {
private String sourceInfo;
// 状态枚举值为 ContainerStatusENABLE/DISABLE
private ContainerStatus status;
private SwitchableStatus status;
public void valid() {
CommonUtils.requireNonNull(containerName, "containerName can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");
CommonUtils.requireNonNull(sourceInfo, "sourceInfo can't be empty");
}
}

View File

@ -13,6 +13,8 @@ public class SystemOverviewVO {
private long jobCount;
private long runningInstanceCount;
private long failedInstanceCount;
// 服务器时区
private String timezone;
// 服务器时间
private long serverTime;
private String serverTime;
}

View File

@ -3,7 +3,7 @@ logging.config=classpath:logback-dev.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
@ -21,8 +21,11 @@ spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 资源清理配置 #######
oms.log.retention.local=0
oms.log.retention.remote=0
oms.container.retention.local=0
oms.container.retention.remote=0
oms.instanceinfo.retention=0
oms.log.retention.local=1
oms.log.retention.remote=1
oms.container.retention.local=1
oms.container.retention.remote=-1
oms.instanceinfo.retention=1
####### 缓存配置 #######
oms.instance.metadata.cache.size=1024

View File

@ -3,7 +3,7 @@ logging.config=classpath:logback-product.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-pre?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-pre?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
@ -25,4 +25,7 @@ oms.log.retention.local=3
oms.log.retention.remote=3
oms.container.retention.local=3
oms.container.retention.remote=-1
oms.instanceinfo.retention=3
oms.instanceinfo.retention=3
####### 缓存配置 #######
oms.instance.metadata.cache.size=1024

View File

@ -25,4 +25,7 @@ oms.log.retention.local=7
oms.log.retention.remote=7
oms.container.retention.local=7
oms.container.retention.remote=-1
oms.instanceinfo.retention=3
oms.instanceinfo.retention=3
####### 缓存配置 #######
oms.instance.metadata.cache.size=2048

View File

@ -5,7 +5,7 @@
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="/favicon.ico">
<title>oms-console</title>
<title>PowerJob</title>
<link href="/js/0.js" rel="prefetch"><link href="/js/1.js" rel="prefetch"><link href="/js/10.js" rel="prefetch"><link href="/js/11.js" rel="prefetch"><link href="/js/2.js" rel="prefetch"><link href="/js/3.js" rel="prefetch"><link href="/js/4.js" rel="prefetch"><link href="/js/5.js" rel="prefetch"><link href="/js/6.js" rel="prefetch"><link href="/js/7.js" rel="prefetch"><link href="/js/8.js" rel="prefetch"><link href="/js/9.js" rel="prefetch"><link href="/js/app.js" rel="preload" as="script"><link href="/js/chunk-vendors.js" rel="preload" as="script"></head>
<body>
<noscript>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,34 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.common.OmsConstant;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.TimeZone;
/**
* 时区问题测试
*
* @author tjq
* @since 2020/6/24
*/
public class TimezoneTest {
@Test
public void testTimeZone() {
Date now = new Date();
System.out.println(now.toString());
System.out.println("timestamp before GMT: " + System.currentTimeMillis());
TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
TimeZone timeZone = TimeZone.getDefault();
System.out.println(timeZone.getDisplayName());
System.out.println(new Date());
System.out.println(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));
System.out.println("timestamp after GMT: " + System.currentTimeMillis());
}
}

View File

@ -1,11 +1,11 @@
# agent 没有 javac 需求,用 JRE 镜像
FROM openjdk:8-jre-slim
# 为了便于使用 arthasagent 也使用 jdk 而不是 jre
FROM adoptopenjdk:8-jdk-hotspot
MAINTAINER tengjiqi@gmail.com
# 设置时区
ENV TZ=Asia/Shanghai
ENV APP_NAME=powerjob-worker-agent
# 传递 SpringBoot 启动参数 和 JVM参数
ENV PARAMS=""
ENV JVMOPTIONS=""
COPY powerjob-agent.jar /powerjob-agent.jar
# 暴露端口AKKA-Client
@ -13,4 +13,4 @@ EXPOSE 27777
# 挂载数据卷,将文件直接输出到宿主机(注意,此处挂载的是匿名卷,即在宿主机位置随机)
VOLUME /root
# 启动应用
ENTRYPOINT ["sh","-c","java -jar /powerjob-agent.jar $PARAMS"]
ENTRYPOINT ["sh","-c","java $JVMOPTIONS -jar /powerjob-agent.jar $PARAMS"]

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.1.0</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.1.0</powerjob.worker.version>
<powerjob.worker.version>3.1.3</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.1.0</version>
<version>3.1.3</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.1.0</powerjob.worker.version>
<powerjob.worker.version>3.1.3</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -31,7 +31,7 @@ public class OhMySchedulerConfig {
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setPort(port);
config.setAppName("oms-test");
config.setAppName("powerjob-agent-test");
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 为了本地模拟多个实例只能使用 MEMORY 启动文件只能由一个应用占有

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.1.0</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.1.0</powerjob.common.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.ServerDestroyContainerRequest;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
import lombok.extern.slf4j.Slf4j;
@ -18,6 +19,7 @@ public class WorkerActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(ServerDeployContainerRequest.class, this::onReceiveServerDeployContainerRequest)
.match(ServerDestroyContainerRequest.class, this::onReceiveServerDestroyContainerRequest)
.matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
.build();
}
@ -25,4 +27,8 @@ public class WorkerActor extends AbstractActor {
private void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
OmsContainerFactory.deployContainer(request);
}
private void onReceiveServerDestroyContainerRequest(ServerDestroyContainerRequest request) {
OmsContainerFactory.destroyContainer(request.getContainerId());
}
}

View File

@ -62,18 +62,18 @@ public class ServerDiscoveryService {
}
if (StringUtils.isEmpty(result)) {
log.warn("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
log.warn("[OmsServerDiscovery] can't find any available server, this worker has been quarantined.");
// Server 高可用的前提下连续失败多次说明该节点与外界失联Server已经将秒级任务转移到其他Worker需要杀死本地的任务
if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
log.error("[OMS-ServerDiscoveryService] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
log.warn("[OmsServerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();
if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
frequentInstanceIds.forEach(instanceId -> {
TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);
taskTracker.destroy();
log.warn("[OMS-ServerDiscoveryService] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
log.warn("[OmsServerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
});
}
@ -83,7 +83,7 @@ public class ServerDiscoveryService {
}else {
// 重置失败次数
FAILED_COUNT = 0;
log.debug("[OMS-ServerDiscoveryService] current server is {}.", result);
log.debug("[OmsServerDiscovery] current server is {}.", result);
return result;
}
}

View File

@ -1,26 +1,37 @@
package com.github.kfcfans.powerjob.worker.common.utils;
import java.util.LinkedHashMap;
import java.util.Map;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.function.BiConsumer;
/**
* LRULeast Recently Used 缓存
* before v3.1.1 使用 LinkedHashMap但存在修改时访问报错问题改用 Guava
*
* @author tjq
* @since 2020/4/8
*/
public class LRUCache<K, V> extends LinkedHashMap<K, V> {
public class LRUCache<K, V> {
private final int cacheSize;
private final Cache<K, V> innerCache;
public LRUCache(int cacheSize) {
super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, false);
this.cacheSize = cacheSize;
innerCache = CacheBuilder.newBuilder()
.concurrencyLevel(2)
.maximumSize(cacheSize)
.build();
}
@Override
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
// 超过阈值时返回true进行LRU淘汰
return size() > cacheSize;
public void forEach(BiConsumer<? super K, ? super V> action) {
innerCache.asMap().forEach(action);
}
public V get(K key) {
return innerCache.getIfPresent(key);
}
public void put(K key, V value) {
innerCache.put(key, value);
}
}

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
@ -38,16 +39,21 @@ public class OmsContainerFactory {
/**
* 获取容器
* @param containerId 容器ID
* @param loadFromServer 当本地不存在时尝试从 server 加载
* @return 容器示例可能为 null
*/
public static OmsContainer getContainer(Long containerId) {
public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) {
OmsContainer omsContainer = CARGO.get(containerId);
if (omsContainer != null) {
return omsContainer;
}
// 尝试下载
if (!loadFromServer) {
return null;
}
// 尝试从 server 加载
log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
@ -65,9 +71,11 @@ public class OmsContainerFactory {
ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);
log.info("[OmsContainer-{}] fetch containerInfo from server successfully.", containerId);
deployContainer(deployRequest);
}else {
log.warn("[OmsContainer-{}] fetch containerInfo failed, reason is {}.", containerId, askResponse.getMessage());
}
}catch (Exception e) {
log.error("[OmsContainer-{}] deployed container failed, exception is {}", containerId, e.toString());
log.error("[OmsContainer-{}] get container failed, exception is {}", containerId, e.toString());
}
return CARGO.get(containerId);
@ -92,11 +100,11 @@ public class OmsContainerFactory {
return;
}
try {
String filePath = OmsWorkerFileUtils.getContainerDir() + containerId + "/" + version + ".jar";
// 下载Container到本地
File jarFile = new File(filePath);
// 下载Container到本地
String filePath = OmsWorkerFileUtils.getContainerDir() + containerId + "/" + version + ".jar";
File jarFile = new File(filePath);
try {
if (!jarFile.exists()) {
FileUtils.forceMkdirParent(jarFile);
FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
@ -118,6 +126,8 @@ public class OmsContainerFactory {
}catch (Exception e) {
log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
// 如果部署失败则删除该 jar本次失败可能是下载jar出错导致不删除会导致这个版本永久无法重新部署
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));
}
}
@ -130,4 +140,21 @@ public class OmsContainerFactory {
CARGO.forEach((name, container) -> info.add(new DeployedContainerInfo(container.getContainerId(), container.getVersion(), container.getDeployedTime(), null)));
return info;
}
/**
* 销毁指定容器
* @param containerId 容器ID
*/
public static void destroyContainer(Long containerId) {
OmsContainer container = CARGO.remove(containerId);
if (container == null) {
log.info("[OmsContainer-{}] container not exists, so there is no need to destroy the container.", containerId);
return;
}
try {
container.destroy();
}catch (Exception e) {
log.warn("[OmsContainer-{}] destroy container failed.", containerId, e);
}
}
}

View File

@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer {
// 需要满足的条件引用计数器减为0 & 有更新的容器出现
if (referenceCount.decrementAndGet() <= 0) {
OmsContainer container = OmsContainerFactory.getContainer(containerId);
OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false);
if (container != this) {
try {
destroy();

View File

@ -309,9 +309,11 @@ public class ProcessorTracker {
String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);
omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0]));
omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true);
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
}else {
log.warn("[ProcessorTracker-{}] load container failed.", instanceId);
}
break;
default:

View File

@ -61,6 +61,7 @@ public class FrequentTaskTracker extends TaskTracker {
private static final int HISTORY_SIZE = 10;
private static final String LAST_TASK_ID_PREFIX = "L";
private static final int MIN_INTERVAL = 1000;
protected FrequentTaskTracker(ServerScheduleJobReq req) {
super(req);
@ -89,6 +90,10 @@ public class FrequentTaskTracker extends TaskTracker {
// 2. 启动任务发射器
launcher = new Launcher();
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
// 固定频率需要设置最小间隔
if (timeParams < MIN_INTERVAL) {
throw new OmsException("time interval too small, please set the timeExpressionInfo >= 1000");
}
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
}else {
scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS);
@ -97,8 +102,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 3. 启动任务分发器事实上秒级任务应该都是单机任务且感觉不需要失败重试机制那么 Dispatcher 的存在就有点浪费系统资源了...
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS);
// 4. 启动状态检查器
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(timeParams, 10000), TimeUnit.MILLISECONDS);
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
}
@Override
@ -114,7 +118,7 @@ public class FrequentTaskTracker extends TaskTracker {
InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail();
BeanUtils.copyProperties(subInstanceInfo, subDetail);
InstanceStatus status = InstanceStatus.of(subInstanceInfo.status);
subDetail.setStatus(status.getDes());
subDetail.setStatus(status.getV());
subDetail.setSubInstanceId(subId);
// 设置时间
@ -213,6 +217,11 @@ public class FrequentTaskTracker extends TaskTracker {
@Override
public void run() {
if (finished.get()) {
return;
}
try {
checkStatus();
reportStatus();
@ -347,8 +356,8 @@ public class FrequentTaskTracker extends TaskTracker {
subInstanceId2TimeHolder.remove(subInstanceId);
// 更新缓存数据
if (recentSubInstanceInfo.containsKey(subInstanceId)) {
SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId);
SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId);
if (subInstanceInfo != null) {
subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV();
subInstanceInfo.result = result;
subInstanceInfo.finishedTime = System.currentTimeMillis();

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.powerjob.function;
import com.github.kfcfans.powerjob.worker.common.utils.LRUCache;
import org.junit.jupiter.api.Test;
/**
* LRU cache test
*
* @author tjq
* @since 2020/6/26
*/
public class LRUCacheTest {
@Test
public void testCache() {
LRUCache<Long, String> cache = new LRUCache<>(10);
for (long i = 0; i < 100; i++) {
cache.put(i, "STR:" + i);
}
cache.forEach((x, y) -> System.out.println("key:" + x));
}
}