diff --git a/README.md b/README.md index 2eaabf71..62e8fa8f 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,21 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架, * 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。 * 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。 -[在线试用地址](https://www.yuque.com/powerjob/guidence/hnbskn) ### 适用场景 * 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。 * 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。 * 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。 +* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。 + +### 设计目标 +PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 + +### 在线试用 +试用地址:[try.powerjob.tech](http://try.powerjob.tech/) +试用应用名称:powerjob-agent-test +控制台密码:123 + +[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) ### 同类产品对比 | | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | @@ -43,7 +53,9 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架, # 文档 -**[超详细中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** OR **[备用地址(内容可能更新不及时)](https://kfcfans.github.io/)** +**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** + +**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! diff --git a/README_enUS.md b/README_enUS.md index 60acd36e..aed3811c 100644 --- a/README_enUS.md +++ b/README_enUS.md @@ -1,13 +1,15 @@

-OhMyScheduler +PowerJob

- - +actions +Maven Central +GitHub release (latest SemVer) +LICENSE

-OhMyScheduler is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily. +PowerJob is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily. # Introduction @@ -30,7 +32,7 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp ### Comparison of similar products -| | QuartZ | xxl-job | SchedulerX 2.0 | OhMyScheduler | +| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | | ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** | | Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** | @@ -43,9 +45,9 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp | workflow | not support | not support | support | **support** | # Document -**[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)** +**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)** -**[中文文档](https://www.yuque.com/ohmyscheduler/product)** +**[中文文档](https://www.yuque.com/powerjob/product)** # Others diff --git a/others/oms-sql.sql b/others/oms-sql.sql index fca1aca6..332cd8ee 100644 --- a/others/oms-sql.sql +++ b/others/oms-sql.sql @@ -2,14 +2,14 @@ Navicat Premium Data Transfer Source Server Type : MySQL - Source Server Version : 50724 - Source Schema : oms-product + Source Server Version : 80020 + Source Schema : powerjob-product Target Server Type : MySQL - Target Server Version : 50724 + Target Server Version : 80020 File Encoding : 65001 - Date: 07/06/2020 11:11:47 + Date: 23/06/2020 22:30:06 */ SET NAMES utf8mb4; @@ -20,130 +20,131 @@ SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- DROP TABLE IF EXISTS `app_info`; CREATE TABLE `app_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, + `id` bigint NOT NULL AUTO_INCREMENT, `app_name` varchar(255) DEFAULT NULL, `current_server` varchar(255) DEFAULT NULL, - `description` varchar(255) DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, + `password` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `appNameUK` (`app_name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for container_info -- ---------------------------- DROP TABLE IF EXISTS `container_info`; CREATE TABLE `container_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, - `app_id` bigint(20) DEFAULT NULL, + `id` bigint NOT NULL AUTO_INCREMENT, + `app_id` bigint DEFAULT NULL, `container_name` varchar(255) DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, `last_deploy_time` datetime(6) DEFAULT NULL, `source_info` varchar(255) DEFAULT NULL, - `source_type` int(11) DEFAULT NULL, - `status` int(11) DEFAULT NULL, + `source_type` int DEFAULT NULL, + `status` int DEFAULT NULL, `version` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDX8hixyaktlnwil2w9up6b0p898` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for instance_info -- ---------------------------- DROP TABLE IF EXISTS `instance_info`; CREATE TABLE `instance_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, - `actual_trigger_time` bigint(20) DEFAULT NULL, - `app_id` bigint(20) DEFAULT NULL, - `expected_trigger_time` bigint(20) DEFAULT NULL, - `finished_time` bigint(20) DEFAULT NULL, + `id` bigint NOT NULL AUTO_INCREMENT, + `actual_trigger_time` bigint DEFAULT NULL, + `app_id` bigint DEFAULT NULL, + `expected_trigger_time` bigint DEFAULT NULL, + `finished_time` bigint DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, - `instance_id` bigint(20) DEFAULT NULL, + `instance_id` bigint DEFAULT NULL, `instance_params` text, - `job_id` bigint(20) DEFAULT NULL, + `job_id` bigint DEFAULT NULL, + `last_report_time` bigint DEFAULT NULL, `result` text, - `running_times` bigint(20) DEFAULT NULL, - `status` int(11) DEFAULT NULL, + `running_times` bigint DEFAULT NULL, + `status` int DEFAULT NULL, `task_tracker_address` varchar(255) DEFAULT NULL, - `type` int(11) DEFAULT NULL, - `wf_instance_id` bigint(20) DEFAULT NULL, + `type` int DEFAULT NULL, + `wf_instance_id` bigint DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`), KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`), KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for job_info -- ---------------------------- DROP TABLE IF EXISTS `job_info`; CREATE TABLE `job_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, - `app_id` bigint(20) DEFAULT NULL, - `concurrency` int(11) DEFAULT NULL, + `id` bigint NOT NULL AUTO_INCREMENT, + `app_id` bigint DEFAULT NULL, + `concurrency` int DEFAULT NULL, `designated_workers` varchar(255) DEFAULT NULL, - `execute_type` int(11) DEFAULT NULL, + `execute_type` int DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, - `instance_retry_num` int(11) DEFAULT NULL, - `instance_time_limit` bigint(20) DEFAULT NULL, + `instance_retry_num` int DEFAULT NULL, + `instance_time_limit` bigint DEFAULT NULL, `job_description` varchar(255) DEFAULT NULL, `job_name` varchar(255) DEFAULT NULL, `job_params` varchar(255) DEFAULT NULL, - `max_instance_num` int(11) DEFAULT NULL, - `max_worker_count` int(11) DEFAULT NULL, + `max_instance_num` int DEFAULT NULL, + `max_worker_count` int DEFAULT NULL, `min_cpu_cores` double NOT NULL, `min_disk_space` double NOT NULL, `min_memory_space` double NOT NULL, - `next_trigger_time` bigint(20) DEFAULT NULL, + `next_trigger_time` bigint DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL, `processor_info` text, - `processor_type` int(11) DEFAULT NULL, - `status` int(11) DEFAULT NULL, - `task_retry_num` int(11) DEFAULT NULL, + `processor_type` int DEFAULT NULL, + `status` int DEFAULT NULL, + `task_retry_num` int DEFAULT NULL, `time_expression` varchar(255) DEFAULT NULL, - `time_expression_type` int(11) DEFAULT NULL, + `time_expression_type` int DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for oms_lock -- ---------------------------- DROP TABLE IF EXISTS `oms_lock`; CREATE TABLE `oms_lock` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, + `id` bigint NOT NULL AUTO_INCREMENT, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, `lock_name` varchar(255) DEFAULT NULL, - `max_lock_time` bigint(20) DEFAULT NULL, + `max_lock_time` bigint DEFAULT NULL, `ownerip` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `lockNameUK` (`lock_name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for server_info -- ---------------------------- DROP TABLE IF EXISTS `server_info`; CREATE TABLE `server_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, + `id` bigint NOT NULL AUTO_INCREMENT, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, `ip` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `UKtk8ytgpl7mpukhnvhbl82kgvy` (`ip`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for user_info -- ---------------------------- DROP TABLE IF EXISTS `user_info`; CREATE TABLE `user_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, + `id` bigint NOT NULL AUTO_INCREMENT, `email` varchar(255) DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, @@ -151,47 +152,47 @@ CREATE TABLE `user_info` ( `phone` varchar(255) DEFAULT NULL, `username` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for workflow_info -- ---------------------------- DROP TABLE IF EXISTS `workflow_info`; CREATE TABLE `workflow_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, - `app_id` bigint(20) DEFAULT NULL, + `id` bigint NOT NULL AUTO_INCREMENT, + `app_id` bigint DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, - `max_wf_instance_num` int(11) DEFAULT NULL, - `next_trigger_time` bigint(20) DEFAULT NULL, + `max_wf_instance_num` int DEFAULT NULL, + `next_trigger_time` bigint DEFAULT NULL, `notify_user_ids` varchar(255) DEFAULT NULL, `pedag` text, - `status` int(11) DEFAULT NULL, + `status` int DEFAULT NULL, `time_expression` varchar(255) DEFAULT NULL, - `time_expression_type` int(11) DEFAULT NULL, + `time_expression_type` int DEFAULT NULL, `wf_description` varchar(255) DEFAULT NULL, `wf_name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`), KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- ---------------------------- -- Table structure for workflow_instance_info -- ---------------------------- DROP TABLE IF EXISTS `workflow_instance_info`; CREATE TABLE `workflow_instance_info` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, - `actual_trigger_time` bigint(20) DEFAULT NULL, - `app_id` bigint(20) DEFAULT NULL, + `id` bigint NOT NULL AUTO_INCREMENT, + `actual_trigger_time` bigint DEFAULT NULL, + `app_id` bigint DEFAULT NULL, `dag` text, - `finished_time` bigint(20) DEFAULT NULL, + `finished_time` bigint DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, `result` text, - `status` int(11) DEFAULT NULL, - `wf_instance_id` bigint(20) DEFAULT NULL, - `workflow_id` bigint(20) DEFAULT NULL, + `status` int DEFAULT NULL, + `wf_instance_id` bigint DEFAULT NULL, + `workflow_id` bigint DEFAULT NULL, PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; SET FOREIGN_KEY_CHECKS = 1; diff --git a/others/script/build_docker.sh b/others/script/build_docker.sh index 3d91f178..d245a8dc 100755 --- a/others/script/build_docker.sh +++ b/others/script/build_docker.sh @@ -62,8 +62,10 @@ if [ "$startup" = "y" ] || [ "$startup" = "Y" ]; then echo "================== 准备启动 powerjob-server ==================" docker run -d \ --name powerjob-server \ - -p 7700:7700 -p 10086:10086 \ + -p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \ + -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ -e PARAMS="--spring.profiles.active=pre" \ + -e TZ="Asia/Shanghai" \ -v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \ tjqq/powerjob-server:$version sleep 1 @@ -74,8 +76,21 @@ if [ "$startup" = "y" ] || [ "$startup" = "Y" ]; then serverIP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' powerjob-server) serverAddress="$serverIP:7700" echo "使用的Server地址:$serverAddress" - docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27777:27777 --name powerjob-agent -v ~/docker/powerjob-agent:/root tjqq/powerjob-agent:$version - docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27778:27777 --name powerjob-agent2 -v ~/docker/powerjob-agent2:/root tjqq/powerjob-agent:$version + docker run -d \ + --name powerjob-agent \ + -p 27777:27777 -p 5002:5005 -p 10002:10000 \ + -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ + -e PARAMS="--app powerjob-agent-test --server $serverAddress" \ + -v ~/docker/powerjob-agent:/root \ + tjqq/powerjob-agent:$version + + docker run -d \ + --name powerjob-agent2 \ + -p 27778:27777 -p 5003:5005 -p 10003:10000 \ + -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" \ + -e PARAMS="--app powerjob-agent-test --server $serverAddress" \ + -v ~/docker/powerjob-agent2:/root \ + tjqq/powerjob-agent:$version tail -f -n 100 ~/docker/powerjob-agent/powerjob/logs/powerjob-agent-application.log fi \ No newline at end of file diff --git a/others/script/debug.sh b/others/script/debug.sh deleted file mode 100644 index e0fa1656..00000000 --- a/others/script/debug.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/sh -# 一键部署脚本,请勿挪动脚本 -cd `dirname $0`/../.. || exit -echo "================== 构建 jar ==================" -mvn clean package -DskipTests -Pdev -e -U -echo "================== 拷贝 jar ==================" -/bin/cp -rf powerjob-server/target/*.jar others/powerjob-server.jar -ls -l others/powerjob-server.jar -echo "================== debug 模式启动 ==================" -nohup java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -jar others/powerjob-server.jar > powerjob-server.log & -sleep 100 -tail --pid=$$ -f -n 1000 others/powerjob-server.log diff --git a/others/script/jenkins_auto.sh b/others/script/jenkins_auto.sh deleted file mode 100644 index 3af4c6fa..00000000 --- a/others/script/jenkins_auto.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash -cd `dirname $0`/../.. || exit -echo "================== 构建 jar ==================" -mvn clean package -Pdev -DskipTests -U -e -pl powerjob-server,powerjob-worker-agent -am -echo "================== 拷贝 jar ==================" -/bin/cp -rf powerjob-server/target/*.jar powerjob-server/docker/powerjob-server.jar -/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar -echo "================== 关闭老应用 ==================" -docker stop powerjob-server -docker stop powerjob-agent -docker stop powerjob-agent2 -echo "================== 删除老容器 ==================" -docker container rm powerjob-server -docker container rm powerjob-agent -docker container rm powerjob-agent2 -echo "================== 删除旧镜像 ==================" -docker rmi -f tjqq/powerjob-server:latest -docker rmi -f tjqq/powerjob-agent:latest -echo "================== 构建 powerjob-server 镜像 ==================" -docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit -echo "================== 构建 powerjob-agent 镜像 ==================" -docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit -echo "================== 准备启动 powerjob-server ==================" -docker run -d \ - --name powerjob-server \ - -p 7700:7700 -p 10086:10086 \ - -e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://172.27.147.252:3306/oms-product?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://172.27.147.252:27017/oms-product" \ - -v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \ - tjqq/powerjob-server:latest -sleep 60 -echo "================== 准备启动 powerjob-client ==================" -serverIP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' powerjob-server) -serverAddress="$serverIP:7700" -echo "使用的Server地址:$serverAddress" -docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27777:27777 --name powerjob-agent -v ~/docker/powerjob-agent:/root tjqq/powerjob-agent:latest -docker run -d -e PARAMS="--app powerjob-agent-test --server $serverAddress" -p 27778:27777 --name powerjob-agent2 -v ~/docker/powerjob-agent2:/root tjqq/powerjob-agent:latest - diff --git a/others/script/jenkins_auto_build.sh b/others/script/jenkins_auto_build.sh new file mode 100755 index 00000000..31dedf78 --- /dev/null +++ b/others/script/jenkins_auto_build.sh @@ -0,0 +1,55 @@ +#!/bin/bash +cd `dirname $0`/../.. || exit +echo "================== 构建 jar ==================" +mvn clean package -Pdev -DskipTests -U -e -pl powerjob-server,powerjob-worker-agent -am +echo "================== 拷贝 jar ==================" +/bin/cp -rf powerjob-server/target/*.jar powerjob-server/docker/powerjob-server.jar +/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar +echo "================== 关闭老应用 ==================" +docker stop powerjob-server +docker stop powerjob-agent +docker stop powerjob-agent2 +echo "================== 删除老容器 ==================" +docker container rm powerjob-server +docker container rm powerjob-agent +docker container rm powerjob-agent2 +echo "================== 删除旧镜像 ==================" +docker rmi -f tjqq/powerjob-server:latest +docker rmi -f tjqq/powerjob-agent:latest +echo "================== 构建 powerjob-server 镜像 ==================" +docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit +echo "================== 构建 powerjob-agent 镜像 ==================" +docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit +echo "================== 准备启动 powerjob-server ==================" +docker run -d \ + --restart=always \ + --name powerjob-server \ + -p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \ + -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ + -e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-product" \ + -v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \ + tjqq/powerjob-server:latest +sleep 60 +echo "================== 准备启动 powerjob-agent ==================" +serverIP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' powerjob-server) +serverAddress="$serverIP:7700" +echo "使用的Server地址:$serverAddress" + +docker run -d \ + --restart=always \ + --name powerjob-agent \ + -p 27777:27777 -p 5002:5005 -p 10002:10000 \ + -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ + -e PARAMS="--app powerjob-agent-test --server $serverAddress" \ + -v ~/docker/powerjob-agent:/root \ + tjqq/powerjob-agent:latest + +docker run -d \ + --restart=always \ + --name powerjob-agent2 \ + -p 27778:27777 -p 5003:5005 -p 10003:10000 \ + -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ + -e PARAMS="--app powerjob-agent-test --server $serverAddress" \ + -v ~/docker/powerjob-agent2:/root \ + tjqq/powerjob-agent:latest + diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 951ca22b..376f6977 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-client - 3.1.0 + 3.1.3 jar - 3.1.0 + 3.1.3 5.6.1 diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index b6ee21b8..54d00ad5 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -66,9 +66,11 @@ public class OhMyClient { appId = Long.parseLong(resultDTO.getData().toString()); currentAddress = addr; break; + }else { + throw new OmsException(resultDTO.getMessage()); } } - }catch (Exception ignore) { + }catch (IOException ignore) { } } @@ -173,13 +175,15 @@ public class OhMyClient { * 运行某个任务 * @param jobId 任务ID * @param instanceParams 任务实例的参数 + * @param delayMS 延迟时间,单位毫秒 * @return 任务实例ID(instanceId) * @throws Exception 异常 */ - public ResultDTO runJob(Long jobId, String instanceParams) throws Exception { + public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) throws Exception { FormBody.Builder builder = new FormBody.Builder() .add("jobId", jobId.toString()) - .add("appId", appId.toString()); + .add("appId", appId.toString()) + .add("delay", String.valueOf(delayMS)); if (StringUtils.isNotEmpty(instanceParams)) { builder.add("instanceParams", instanceParams); @@ -188,7 +192,7 @@ public class OhMyClient { return JsonUtils.parseObject(post, ResultDTO.class); } public ResultDTO runJob(Long jobId) throws Exception { - return runJob(jobId, null); + return runJob(jobId, null, 0); } /* ************* Instance 区 ************* */ diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java index 2306dffc..f1f7dec8 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/TestClient.java @@ -21,7 +21,7 @@ public class TestClient { @BeforeAll public static void initClient() throws Exception { - ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null); + ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); } @Test @@ -70,7 +70,7 @@ public class TestClient { @Test public void testRunJob() throws Exception { - System.out.println(ohMyClient.runJob(8L, "this is instanceParams")); + System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000)); } @Test diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 0e9d02c1..b4ebcc1f 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.1.0 + 3.1.3 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java index 4f6a7fca..c9ab4eaa 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java @@ -43,7 +43,7 @@ public class InstanceDetail implements OmsSerializable { private String startTime; private String finishedTime; private String result; - private String status; + private int status; } // MapReduce 和 Broadcast 任务的 extra -> diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java index aa7ea83f..718f543f 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java @@ -15,5 +15,5 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AllArgsConstructor public class ServerDestroyContainerRequest implements OmsSerializable { - private String containerName; + private Long containerId; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java index b2fa7f78..ca3b5c2c 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.common.request.http; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; import lombok.Data; import java.util.List; @@ -76,4 +77,14 @@ public class SaveJobInfoRequest { // 报警用户ID列表 private List notifyUserIds; + + + public void valid() { + CommonUtils.requireNonNull(jobName, "jobName can't be empty"); + CommonUtils.requireNonNull(appId, "appId can't be empty"); + CommonUtils.requireNonNull(processorInfo, "processorInfo can't be empty"); + CommonUtils.requireNonNull(executeType, "executeType can't be empty"); + CommonUtils.requireNonNull(processorType, "processorType can't be empty"); + CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty"); + } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java index ec06754a..3e0e03e1 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.request.http; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.google.common.collect.Lists; import lombok.Data; @@ -43,4 +44,11 @@ public class SaveWorkflowRequest { // 工作流整体失败的报警 private List notifyUserIds = Lists.newLinkedList(); + + public void valid() { + CommonUtils.requireNonNull(wfName, "workflow name can't be empty"); + CommonUtils.requireNonNull(appId, "appId can't be empty"); + CommonUtils.requireNonNull(pEWorkflowDAG, "dag can't be empty"); + CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty"); + } } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java index 8126dde8..52277188 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java @@ -1,8 +1,11 @@ package com.github.kfcfans.powerjob.common.utils; +import com.github.kfcfans.powerjob.common.OmsException; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.util.Collection; +import java.util.Objects; import java.util.function.Supplier; @@ -114,4 +117,16 @@ public class CommonUtils { return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } + public static T requireNonNull(T obj, String msg) { + if (obj == null) { + throw new OmsException(msg); + } + if (obj instanceof String) { + if (StringUtils.isEmpty((String) obj)) { + throw new OmsException(msg); + } + } + return obj; + } + } diff --git a/powerjob-server/docker/Dockerfile b/powerjob-server/docker/Dockerfile index 473d4f62..62127270 100644 --- a/powerjob-server/docker/Dockerfile +++ b/powerjob-server/docker/Dockerfile @@ -1,5 +1,5 @@ -# 基础镜像 -FROM openjdk:8 +# 基础镜像(支持 amd64 & arm64),based on Ubuntu 18.04.4 LTS +FROM adoptopenjdk:8-jdk-hotspot # 维护者 MAINTAINER tengjiqi@gmail.com # 下载并安装 maven @@ -11,13 +11,14 @@ COPY settings.xml /opt/powerjob-maven/conf/settings.xml # 设置 maven 环境变量(maven invoker 读取该变量调用 maven) ENV M2_HOME=/opt/powerjob-maven -# 设置时区(Debian专用方法) +# 设置时区 ENV TZ=Asia/Shanghai # 设置其他环境变量 ENV APP_NAME=powerjob-server -# 传递 SpringBoot 启动参数 +# 传递 SpringBoot 启动参数 和 JVM参数 ENV PARAMS="" +ENV JVMOPTIONS="" # 将应用 jar 包拷入 docker COPY powerjob-server.jar /powerjob-server.jar # 暴露端口(HTTP + AKKA) @@ -27,4 +28,4 @@ RUN mkdir -p /root/powerjob-server # 挂载数据卷,将文件直接输出到宿主机(注意,此处挂载的是匿名卷,即在宿主机位置随机) VOLUME /root/powerjob-server # 启动应用 -ENTRYPOINT ["sh","-c","java -jar /powerjob-server.jar $PARAMS"] +ENTRYPOINT ["sh","-c","java $JVMOPTIONS -jar /powerjob-server.jar $PARAMS"] diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 59d9120e..7f3cdbf5 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.1.0 + 3.1.3 jar 2.9.2 2.2.6.RELEASE - 3.1.0 + 3.1.3 8.0.19 1.4.200 2.5.2 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index 6acb41fb..fc83f9a2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerReque import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; @@ -91,8 +92,10 @@ public class ServerActor extends AbstractActor { Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); AskResponse askResponse = new AskResponse(); - askResponse.setSuccess(false); - if (containerInfoOpt.isPresent()) { + if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) { + askResponse.setSuccess(false); + askResponse.setMessage("can't find container by id: " + req.getContainerId()); + }else { ContainerInfoDO containerInfo = containerInfoOpt.get(); askResponse.setSuccess(true); @@ -104,7 +107,6 @@ public class ServerActor extends AbstractActor { askResponse.setData(JsonUtils.toBytes(dpReq)); } - getSender().tell(askResponse, getSelf()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/ContainerStatus.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/ContainerStatus.java deleted file mode 100644 index f93b4e90..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/ContainerStatus.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.github.kfcfans.powerjob.server.common.constans; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * 容器状态 - * 由于命名约束,准备采取硬删除策略 - * - * @author tjq - * @since 2020/5/15 - */ -@Getter -@AllArgsConstructor -public enum ContainerStatus { - - ENABLE(1), - DISABLE(2), - DELETED(99); - - private int v; - - public static ContainerStatus of(int v) { - for (ContainerStatus type : values()) { - if (type.v == v) { - return type; - } - } - throw new IllegalArgumentException("unknown ContainerStatus of " + v); - } -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java index e753cd49..de59f0f5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java @@ -1,5 +1,19 @@ package com.github.kfcfans.powerjob.server.common.utils; +/* +Copyright [2020] [KFCFans] +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ import java.io.Serializable; import java.text.ParseException; import java.util.Calendar; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java index 41960391..b1077bb1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java @@ -37,8 +37,6 @@ public class HashedWheelTimer implements Timer { private final ExecutorService taskProcessPool; - private static final int MAXIMUM_CAPACITY = 1 << 30; - public HashedWheelTimer(long tickDuration, int ticksPerWheel) { this(tickDuration, ticksPerWheel, 0); } @@ -47,9 +45,9 @@ public class HashedWheelTimer implements Timer { * 新建时间轮定时器 * @param tickDuration 时间间隔,单位毫秒(ms) * @param ticksPerWheel 轮盘个数 - * @param processTaskNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池) + * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池) */ - public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processTaskNum) { + public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) { this.tickDuration = tickDuration; @@ -62,12 +60,13 @@ public class HashedWheelTimer implements Timer { mask = wheel.length - 1; // 初始化执行线程池 - if (processTaskNum <= 0) { + if (processThreadNum <= 0) { taskProcessPool = null; }else { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); BlockingQueue queue = Queues.newLinkedBlockingQueue(16); - taskProcessPool = new ThreadPoolExecutor(2, processTaskNum, + int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum); + taskProcessPool = new ThreadPoolExecutor(core, 2 * core, 60, TimeUnit.SECONDS, queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java index 61ec9ed6..68f50609 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.server.common.utils.timewheel; /** - * description + * TimerFuture * * @author tjq * @since 2020/4/3 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java index 1f913039..f1a4c916 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java @@ -13,5 +13,5 @@ import java.util.List; */ public interface ContainerInfoRepository extends JpaRepository { - List findByAppId(Long appId); + List findByAppIdAndStatusNot(Long appId, Integer status); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java index 3c434699..e5ef8513 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java @@ -68,7 +68,7 @@ public interface InstanceInfoRepository extends JpaRepository findByJobIdInAndStatusIn(List jobIds, List status); // 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... - // 结果只能用 int 接受 + // 结果只能用 int 接收 @Modifying @Transactional @Query(value = "delete from instance_info where gmt_modified < ?1", nativeQuery = true) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java index aaf05ac8..0f10b0b4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java @@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; @@ -85,6 +86,9 @@ public class ContainerService { * @param request 容器保存请求 */ public void save(SaveContainerInfoRequest request) { + + request.valid(); + ContainerInfoDO container; Long originId = request.getId(); if (originId != null) { @@ -120,15 +124,17 @@ public class ContainerService { throw new RuntimeException("Permission Denied!"); } - ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getContainerName()); + ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId()); WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> { ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress); workerActor.tell(destroyRequest, null); }); log.info("[ContainerService] delete container: {}.", container); - // 硬删除算了...留着好像也没什么用 - containerInfoRepository.deleteById(containerId); + // 软删除 + container.setStatus(SwitchableStatus.DELETED.getV()); + container.setGmtModified(new Date()); + containerInfoRepository.saveAndFlush(container); } /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 1f94bd77..7f5200a8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; +import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -39,6 +40,8 @@ public class DispatchService { @Resource private InstanceManager instanceManager; @Resource + private InstanceMetadataService instanceMetadataService; + @Resource private InstanceInfoRepository instanceInfoRepository; private static final Splitter commaSplitter = Splitter.on(","); @@ -142,5 +145,8 @@ public class DispatchService { // 修改状态 instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams, now); + + // 装载缓存 + instanceMetadataService.loadJobInfo(instanceId, jobInfo); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index da3c03d5..025a5720 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogDO; import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; -import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; +import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -46,7 +46,7 @@ import java.util.stream.Stream; public class InstanceLogService { @Resource - private InstanceManager instanceManager; + private InstanceMetadataService instanceMetadataService; @Resource private GridFsManager gridFsManager; // 本地数据库操作bean @@ -320,13 +320,12 @@ public class InstanceLogService { // 定时删除秒级任务的日志 List frequentInstanceIds = Lists.newLinkedList(); instanceId2LastReportTime.keySet().forEach(instanceId -> { - JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId); - if (jobInfo == null) { - return; - } - - if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { - frequentInstanceIds.add(instanceId); + try { + JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId); + if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) { + frequentInstanceIds.add(instanceId); + } + }catch (Exception ignore) { } }); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 6c187d45..b485edf2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; +import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -21,6 +22,7 @@ import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * 任务服务 @@ -50,6 +52,8 @@ public class JobService { */ public Long saveJob(SaveJobInfoRequest request) throws Exception { + request.valid(); + JobInfoDO jobInfoDO; if (request.getId() != null) { jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId())); @@ -94,16 +98,22 @@ public class JobService { * 手动立即运行某个任务 * @param jobId 任务ID * @param instanceParams 任务实例参数(仅 OpenAPI 存在) + * @param delay 延迟时间,单位 毫秒 * @return 任务实例ID */ - public long runJob(Long jobId, String instanceParams) { + public long runJob(Long jobId, String instanceParams, long delay) { JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); - - Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis()); + Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); instanceInfoRepository.flush(); - dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); + if (delay <= 0) { + dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); + }else { + HashedWheelTimerHolder.TIMER.schedule(() -> { + dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); + }, delay, TimeUnit.MILLISECONDS); + } return instanceId; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java index d96e20b1..1c494f62 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/alarm/DefaultMailAlarmService.java @@ -65,7 +65,7 @@ public class DefaultMailAlarmService implements Alarmable { javaMailSender.send(sm); }catch (Exception e) { - log.error("[OmsMailAlarmService] send mail({}) failed.", sm, e); + log.error("[OmsMailAlarmService] send mail({}) failed, reason is {}", sm, e.getMessage()); } } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java index 46c664cf..abaabc61 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ClusterStatusHolder.java @@ -30,7 +30,7 @@ public class ClusterStatusHolder { // 集群中所有机器的最后心跳时间 private Map address2ActiveTime; - private static final long WORKER_TIMEOUT_MS = 30000; + private static final long WORKER_TIMEOUT_MS = 60000; public ClusterStatusHolder(String appName) { this.appName = appName; @@ -49,7 +49,7 @@ public class ClusterStatusHolder { Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L); if (heartbeatTime < oldTime) { - log.warn("[ClusterStatusHolder-{}] receive the old heartbeat: {}.", appName, heartbeat); + log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime()); return; } @@ -131,10 +131,25 @@ public class ClusterStatusHolder { /** * 释放所有本地存储的容器信息(该操作会导致短暂的 listDeployedContainer 服务不可用) */ - public void releaseContainerInfos() { + public void release() { log.info("[ClusterStatusHolder-{}] clean the containerInfos, listDeployedContainer service may down about 1min~", appName); // 丢弃原来的所有数据,准备重建 containerId2Infos = Maps.newConcurrentMap(); + + // 丢弃超时机器的信息 + List timeoutAddress = Lists.newLinkedList(); + address2Metrics.forEach((addr, lastActiveTime) -> { + if (timeout(addr)) { + timeoutAddress.add(addr); + } + }); + if (!timeoutAddress.isEmpty()) { + log.info("[ClusterStatusHolder-{}] detective timeout workers({}), try to release their infos.", appName, timeoutAddress); + timeoutAddress.forEach(addr -> { + address2ActiveTime.remove(addr); + address2Metrics.remove(addr); + }); + } } private boolean timeout(String address) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java index 5b232a3e..e9037176 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java @@ -43,7 +43,7 @@ public class ServerSelectService { /** * 获取某个应用对应的Server - * 缺点:如果server死而复生,可能造成worker集群脑裂,不过感觉影响不是很大 & 概率极低,就不管了 + * * @param appId 应用ID * @return 当前可用的Server */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java index 0ab2bf02..02b667fc 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/WorkerManagerService.java @@ -94,9 +94,9 @@ public class WorkerManagerService { } /** - * 释放所有本地存储的容器信息(该操作会导致短暂的 listDeployedContainer 服务不可用) + * 清理缓存信息,防止 OOM */ - public static void releaseContainerInfos() { - appId2ClusterStatus.values().forEach(ClusterStatusHolder::releaseContainerInfos); + public static void cleanUp() { + appId2ClusterStatus.values().forEach(ClusterStatusHolder::release); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index cb386261..a49e084c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -8,7 +8,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; -import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.DispatchService; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.UserService; @@ -16,8 +15,6 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -25,7 +22,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -38,10 +34,6 @@ import java.util.concurrent.TimeUnit; @Service public class InstanceManager { - // 存储 instanceId 对应的 Job 信息,便于重试 - private static Cache instanceId2JobInfo; - - // Spring Bean @Resource private DispatchService dispatchService; @Resource @@ -49,21 +41,12 @@ public class InstanceManager { @Resource(name = "omsCenterAlarmService") private Alarmable omsCenterAlarmService; @Resource - private InstanceInfoRepository instanceInfoRepository; + private InstanceMetadataService instanceMetadataService; @Resource - private JobInfoRepository jobInfoRepository; + private InstanceInfoRepository instanceInfoRepository; @Resource private WorkflowInstanceManager workflowInstanceManager; - private static final int CACHE_CONCURRENCY_LEVEL = 8; - private static final int CACHE_MAX_SIZE = 4096; - - static { - instanceId2JobInfo = CacheBuilder.newBuilder() - .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) - .maximumSize(CACHE_MAX_SIZE) - .build(); - } /** * 更新任务状态 @@ -71,19 +54,18 @@ public class InstanceManager { */ public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception { - Long jobId = req.getJobId(); Long instanceId = req.getInstanceId(); // 获取相关数据 - JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> { - Optional jobInfoOpt = jobInfoRepository.findById(jobId); - return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId)); - }); + JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId()); InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - + if (instanceInfo == null) { + log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId); + return; + } // 丢弃过期的上报数据 if (req.getReportTime() <= instanceInfo.getLastReportTime()) { - log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will br dropped.", instanceId, req); + log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will be dropped.", instanceId, req); return; } @@ -105,6 +87,7 @@ public class InstanceManager { // 综上,直接把 status 和 runningNum 同步到DB即可 if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) { + instanceInfo.setResult(req.getResult()); instanceInfo.setRunningTimes(req.getTotalTaskNum()); instanceInfoRepository.saveAndFlush(instanceInfo); return; @@ -170,9 +153,11 @@ public class InstanceManager { // 告警 if (status == InstanceStatus.FAILED) { - JobInfoDO jobInfo = fetchJobInfo(instanceId); - if (jobInfo == null) { - log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId); + JobInfoDO jobInfo; + try { + jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId); + }catch (Exception e) { + log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId); return; } @@ -185,31 +170,8 @@ public class InstanceManager { omsCenterAlarmService.onJobInstanceFailed(content, userList); } - // 过期缓存 - instanceId2JobInfo.invalidate(instanceId); + // 主动移除缓存,减小内存占用 + instanceMetadataService.invalidateJobInfo(instanceId); } - /** - * 根据任务实例ID查询任务相关信息 - * @param instanceId 任务实例ID - * @return 任务元数据 - */ - public JobInfoDO fetchJobInfo(Long instanceId) { - JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId); - if (jobInfo != null) { - return jobInfo; - } - InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - if (instanceInfo != null) { - return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null); - } - return null; - } - - /** - * 释放本地缓存,防止内存泄漏 - */ - public static void releaseCache() { - instanceId2JobInfo.cleanUp(); - } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetadataService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetadataService.java new file mode 100644 index 00000000..42d59b48 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetadataService.java @@ -0,0 +1,80 @@ +package com.github.kfcfans.powerjob.server.service.instance; + +import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +/** + * 存储 instance 对应的 JobInfo 信息 + * + * @author tjq + * @since 2020/6/23 + */ +@Service +public class InstanceMetadataService implements InitializingBean { + + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private InstanceInfoRepository instanceInfoRepository; + + // 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变) + private Cache instanceId2JobInfoCache; + + @Value("${oms.instance.metadata.cache.size}") + private int instanceMetadataCacheSize; + private static final int CACHE_CONCURRENCY_LEVEL = 4; + + @Override + public void afterPropertiesSet() throws Exception { + instanceId2JobInfoCache = CacheBuilder.newBuilder() + .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) + .maximumSize(instanceMetadataCacheSize) + .build(); + } + + /** + * 根据 instanceId 获取 JobInfo + * @param instanceId instanceId + * @return JobInfoDO + * @throws ExecutionException 异常 + */ + public JobInfoDO fetchJobInfoByInstanceId(Long instanceId) throws ExecutionException { + return instanceId2JobInfoCache.get(instanceId, () -> { + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); + if (instanceInfo != null) { + Optional jobInfoOpt = jobInfoRepository.findById(instanceInfo.getJobId()); + return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobInfo by jobId: " + instanceInfo.getJobId())); + } + throw new IllegalArgumentException("can't find Instance by instanceId: " + instanceId); + }); + } + + /** + * 装载缓存 + * @param instanceId instanceId + * @param jobInfoDO 原始的任务数据 + */ + public void loadJobInfo(Long instanceId, JobInfoDO jobInfoDO) { + instanceId2JobInfoCache.put(instanceId, jobInfoDO); + } + + /** + * 失效缓存 + * @param instanceId instanceId + */ + public void invalidateJobInfo(Long instanceId) { + instanceId2JobInfoCache.invalidate(instanceId); + } + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index e3b9fd88..781dd5e5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -5,7 +5,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; -import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; @@ -59,8 +58,7 @@ public class CleanService { public void timingClean() { // 释放本地缓存 - WorkerManagerService.releaseContainerInfos(); - InstanceManager.releaseCache(); + WorkerManagerService.cleanUp(); // 删除数据库运行记录 cleanInstanceLog(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 0ebcd68a..d4aa4efd 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -258,7 +258,7 @@ public class OmsScheduleService { } log.info("[FrequentScheduler] These frequent jobs will be scheduled: {}.", notRunningJobIds); - notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null)); + notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null, 0)); }catch (Exception e) { log.error("[FrequentScheduler] schedule frequent job failed.", e); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index c2884f65..63e224b5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -39,6 +39,8 @@ public class WorkflowService { */ public Long saveWorkflow(SaveWorkflowRequest req) throws Exception { + req.valid(); + if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) { throw new OmsException("illegal DAG"); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java index c11eae81..bd0453a4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java @@ -4,7 +4,7 @@ import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; -import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.ContainerTemplateGenerator; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; @@ -87,7 +87,8 @@ public class ContainerController { @GetMapping("/list") public ResultDTO> listContainers(Long appId) { - List res = containerInfoRepository.findByAppId(appId).stream().map(ContainerController::convert).collect(Collectors.toList()); + List res = containerInfoRepository.findByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV()) + .stream().map(ContainerController::convert).collect(Collectors.toList()); return ResultDTO.success(res); } @@ -122,7 +123,7 @@ public class ContainerController { }else { vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN)); } - ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus()); + SwitchableStatus status = SwitchableStatus.of(containerInfoDO.getStatus()); vo.setStatus(status.name()); ContainerSourceType sourceType = ContainerSourceType.of(containerInfoDO.getSourceType()); vo.setSourceType(sourceType.name()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index af55157a..537710e8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.web.controller; +import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.akka.OhMyServer; @@ -21,6 +22,7 @@ import org.springframework.data.domain.Example; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -112,6 +114,10 @@ public class InstanceController { BeanUtils.copyProperties(request, queryEntity); queryEntity.setType(request.getType().getV()); + if (!StringUtils.isEmpty(request.getStatus())) { + queryEntity.setStatus(InstanceStatus.valueOf(request.getStatus()).getV()); + } + Page pageResult = instanceInfoRepository.findAll(Example.of(queryEntity), pageable); return ResultDTO.success(convertPage(pageResult)); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java index 34b476e9..c8afcab9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java @@ -63,7 +63,7 @@ public class JobController { @GetMapping("/run") public ResultDTO runImmediately(String jobId) { - return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null)); + return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null, 0)); } @PostMapping("/list") diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index df4662c2..b9519f9c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -80,9 +80,9 @@ public class OpenAPIController { } @PostMapping(OpenAPIConstant.RUN_JOB) - public ResultDTO runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams) { + public ResultDTO runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) { checkJobIdValid(jobId, appId); - return ResultDTO.success(jobService.runJob(jobId, instanceParams)); + return ResultDTO.success(jobService.runJob(jobId, instanceParams, delay == null ? 0 : delay)); } /* ************* Instance 区 ************* */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java index 6a3c2588..768a6f71 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java @@ -46,4 +46,9 @@ public class ServerController { return ResultDTO.success(server); } + @GetMapping("/hello") + public ResultDTO ping() { + return ResultDTO.success("this is powerjob-server~"); + } + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java index f238b75f..1a7508ed 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.web.controller; import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.response.AskResponse; @@ -18,6 +19,7 @@ import com.github.kfcfans.powerjob.server.web.response.SystemOverviewVO; import com.github.kfcfans.powerjob.server.web.response.WorkerStatusVO; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; @@ -57,7 +59,7 @@ public class SystemInfoController { } String server =appInfoOpt.get().getCurrentServer(); - // 没有Server + // 没有 Server,说明从来没有该 appId 的 worker 集群连接过 if (StringUtils.isEmpty(server)) { return ResultDTO.success(Collections.emptyList()); } @@ -103,8 +105,10 @@ public class SystemInfoController { Date date = DateUtils.addDays(new Date(), -1); overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date)); + // 服务器时区 + overview.setTimezone(TimeZone.getDefault().getDisplayName()); // 服务器时间 - overview.setServerTime(System.currentTimeMillis()); + overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN)); return ResultDTO.success(overview); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java index 99258ec9..67c1101f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/WorkflowInstanceController.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.web.controller; +import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.persistence.PageResult; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO; @@ -13,6 +14,7 @@ import org.springframework.data.domain.Example; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -54,6 +56,11 @@ public class WorkflowInstanceController { WorkflowInstanceInfoDO queryEntity = new WorkflowInstanceInfoDO(); BeanUtils.copyProperties(req, queryEntity); + + if (!StringUtils.isEmpty(req.getStatus())) { + queryEntity.setStatus(WorkflowInstanceStatus.valueOf(req.getStatus()).getV()); + } + Page ps = workflowInstanceInfoRepository.findAll(Example.of(queryEntity), pageable); return ResultDTO.success(convertPage(ps)); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryInstanceRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryInstanceRequest.java index dae309a7..cededd6d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryInstanceRequest.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryInstanceRequest.java @@ -24,4 +24,6 @@ public class QueryInstanceRequest { private Long instanceId; private Long jobId; private Long wfInstanceId; + + private String status; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryWorkflowInstanceRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryWorkflowInstanceRequest.java index f566f41b..62128579 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryWorkflowInstanceRequest.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/QueryWorkflowInstanceRequest.java @@ -21,4 +21,6 @@ public class QueryWorkflowInstanceRequest { // 查询条件(NORMAL/WORKFLOW) private Long wfInstanceId; private Long workflowId; + + private String status; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java index 5e2536c7..b56837c1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java @@ -1,7 +1,8 @@ package com.github.kfcfans.powerjob.server.web.request; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType; -import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import lombok.Data; /** @@ -28,5 +29,11 @@ public class SaveContainerInfoRequest { private String sourceInfo; // 状态,枚举值为 ContainerStatus(ENABLE/DISABLE) - private ContainerStatus status; + private SwitchableStatus status; + + public void valid() { + CommonUtils.requireNonNull(containerName, "containerName can't be empty"); + CommonUtils.requireNonNull(appId, "appId can't be empty"); + CommonUtils.requireNonNull(sourceInfo, "sourceInfo can't be empty"); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/SystemOverviewVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/SystemOverviewVO.java index 39e8c1ab..3e25dfbd 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/SystemOverviewVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/SystemOverviewVO.java @@ -13,6 +13,8 @@ public class SystemOverviewVO { private long jobCount; private long runningInstanceCount; private long failedInstanceCount; + // 服务器时区 + private String timezone; // 服务器时间 - private long serverTime; + private String serverTime; } diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index be9b0491..87036311 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -3,7 +3,7 @@ logging.config=classpath:logback-dev.xml ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-daily?useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 @@ -21,8 +21,11 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=0 -oms.log.retention.remote=0 -oms.container.retention.local=0 -oms.container.retention.remote=0 -oms.instanceinfo.retention=0 \ No newline at end of file +oms.log.retention.local=1 +oms.log.retention.remote=1 +oms.container.retention.local=1 +oms.container.retention.remote=-1 +oms.instanceinfo.retention=1 + +####### 缓存配置 ####### +oms.instance.metadata.cache.size=1024 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application-pre.properties b/powerjob-server/src/main/resources/application-pre.properties index 4cad8276..5e2291d8 100644 --- a/powerjob-server/src/main/resources/application-pre.properties +++ b/powerjob-server/src/main/resources/application-pre.properties @@ -3,7 +3,7 @@ logging.config=classpath:logback-product.xml ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-pre?useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-pre?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 @@ -25,4 +25,7 @@ oms.log.retention.local=3 oms.log.retention.remote=3 oms.container.retention.local=3 oms.container.retention.remote=-1 -oms.instanceinfo.retention=3 \ No newline at end of file +oms.instanceinfo.retention=3 + +####### 缓存配置 ####### +oms.instance.metadata.cache.size=1024 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application-product.properties b/powerjob-server/src/main/resources/application-product.properties index ed1e8d00..8df29624 100644 --- a/powerjob-server/src/main/resources/application-product.properties +++ b/powerjob-server/src/main/resources/application-product.properties @@ -25,4 +25,7 @@ oms.log.retention.local=7 oms.log.retention.remote=7 oms.container.retention.local=7 oms.container.retention.remote=-1 -oms.instanceinfo.retention=3 \ No newline at end of file +oms.instanceinfo.retention=3 + +####### 缓存配置 ####### +oms.instance.metadata.cache.size=2048 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/static/index.html b/powerjob-server/src/main/resources/static/index.html index 0d8ada21..8d933786 100644 --- a/powerjob-server/src/main/resources/static/index.html +++ b/powerjob-server/src/main/resources/static/index.html @@ -5,7 +5,7 @@ - oms-console + PowerJob