[release] v3.3.0

This commit is contained in:
tjq 2020-10-08 21:53:16 +08:00
commit 9dbfb163b4
53 changed files with 341 additions and 449 deletions

View File

@ -1,105 +0,0 @@
# 2020.4.8 第一轮测试
## 测试用例
* MapReduce任务http://localhost:7700/job/save?appId=1&concurrency=5&executeType=MAP_REDUCE&groupName=null&instanceRetryNum=3&instanceTimeLimit=4545454545&jobDescription=jobDescription&jobName=testJob&jobParams=%7B%22a%22%3A%22b%22%7D&maxInstanceNum=1&processorInfo=com.github.kfcfans.powerjob.processors.TestMapReduceProcessor&processorType=EMBEDDED_JAVA&status=1&taskRetryNum=3&taskTimeLimit=564465656&timeExpression=0%20*%20*%20*%20*%20%3F%20&timeExpressionType=CRON
## 问题记录
#### 任务执行成功,释放资源失败
第一个任务执行完成后释放资源阶段删除本地H2数据库中所有记录报错堆栈如下
```text
2020-04-08 10:09:19 INFO - [ProcessorTracker-1586311659084] mission complete, ProcessorTracker already destroyed!
2020-04-08 10:09:19 ERROR - [TaskPersistenceService] deleteAllTasks failed, instanceId=1586311659084.
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at CommonUtils.executeWithRetry(CommonUtils.java:34)
at TaskPersistenceService.execute(TaskPersistenceService.java:297)
at TaskPersistenceService.deleteAllTasks(TaskPersistenceService.java:269)
at CommonTaskTracker.destroy(TaskTracker.java:231)
at CommonTaskTracker$StatusCheckRunnable.innerRun(TaskTracker.java:421)
at CommonTaskTracker$StatusCheckRunnable.run(TaskTracker.java:467)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-04-08 10:09:19 WARN - [TaskTracker-1586311659084] delete tasks from database failed.
2020-04-08 10:09:19 INFO - [TaskTracker-1586311659084] TaskTracker has left the world.
```
随后Server派发下来的第二个任务也无法完成创建异常堆栈如下
```text
2020-04-08 10:10:08 ERROR - [TaskPersistenceService] save taskTaskDO{taskId='0', jobId='1', instanceId='1586311804030', taskName='OMS_ROOT_TASK', address='10.37.129.2:2777', status=1, result='null', failedCnt=0, createdTime=1586311808295, lastModifiedTime=1586311808295} failed.
2020-04-08 10:10:08 ERROR - [TaskTracker-1586311804030] create root task failed.
[ERROR] [04/08/2020 10:10:08.511] [oms-akka.actor.internal-dispatcher-20] [akka://oms/user/task_tracker] create root task failed.
java.lang.RuntimeException: create root task failed.
at CommonTaskTracker.persistenceRootTask(TaskTracker.java:208)
at CommonTaskTracker.<init>(TaskTracker.java:81)
at TaskTrackerActor.lambda$onReceiveServerScheduleJobReq$2(TaskTrackerActor.java:138)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at TaskTrackerPool.atomicCreateTaskTracker(TaskTrackerPool.java:30)
at TaskTrackerActor.onReceiveServerScheduleJobReq(TaskTrackerActor.java:138)
```
***
原因及解决方案destroy方法调用了scheduledPool.shutdownNow()方法导致调用该方法的线程池被强制关闭该方法也自然被中断数据删到一半没删掉破坏了数据库结构后面的insert自然也就失败了。
# 2020.4.11 "集群"测试
#### 任务重试机制失效
原因SQL中的now()函数返回的是Datetime不能用ing/bigint去接收...
#### SystemMetric算分问题
问题java.lang.management.OperatingSystemMXBean#getSystemLoadAverage 不一定能获取CPU当前负载可能返回负数代表不可用...
解决方案印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧...
#### 未知的数组越界问题(可能是数据库性能问题)
问题秒级Broadcast任务在第四次执行时当Processor完成执行上报状态时TaskTracker报错错误的本质原因是无法从数据库中找到这个task对应的记录...
场景时间表达式FIX_DELAY对应的TaskTracker为FrequentTaskTracker
异常堆栈
```text
2020-04-16 18:05:09 ERROR - [TaskPersistenceService] getTaskStatus failed, instanceId=1586857062542,taskId=4.
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.LinkedList.checkElementIndex(LinkedList.java:555)
at java.util.LinkedList.get(LinkedList.java:476)
at TaskPersistenceService.lambda$getTaskStatus$10(TaskPersistenceService.java:214)
at CommonUtils.executeWithRetry(CommonUtils.java:37)
at TaskPersistenceService.execute(TaskPersistenceService.java:310)
at TaskPersistenceService.getTaskStatus(TaskPersistenceService.java:212)
at TaskTracker.updateTaskStatus(TaskTracker.java:107)
at TaskTracker.broadcast(TaskTracker.java:214)
at TaskTrackerActor.onReceiveBroadcastTaskPreExecuteFinishedReq(TaskTrackerActor.java:106)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:187)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:186)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:241)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6).
```
解决方案初步怀疑在连续更改时由于数据库锁的存在导致行不可见不知道H2具体的特性。因此需要保证同一个taskId串行更新 -> synchronize Yes
# 2020.4.20 1.0.0发布前测试
#### Server & Worker
* 指定机器执行 -> 验证通过
* Map/MapReduce/Standalone/Broadcast/Shell/Python处理器的执行 -> 验证通过
* 超时失败 -> 验证通过
* 破坏测试:指定错误的处理器 -> 发现问题,会造成死锁(TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T)。通过确保ProcessorTracker一定能创建成功解决如果处理器构建失败之后所有提交的任务直接返回错误。
#### Client
* StopInstance -> success
* FetchInstanceStatus -> success

View File

@ -1,34 +0,0 @@
# 容器测试日志
## ClassNotFound问题
>玩热加载这一套不来几个ClassNotFound都没那味 [滑稽]
测试容器化的MapReduce任务时发现如下错误
```text
2020-05-19 09:33:18 ERROR - [ProcessorRunnable-142925055284740224] execute failed, please fix this bug @tjq!
com.esotericsoftware.kryo.KryoException: Unable to find class: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:182)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:151)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:684)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:795)
at SerializerUtils.deSerialized(SerializerUtils.java:48)
at ProcessorRunnable.innerRun(ProcessorRunnable.java:63)
at ProcessorRunnable.run(ProcessorRunnable.java:179)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:176)
... 12 common frames omitted
```
* 原因分析:经过分析,原有在于序列化与反序列化过程中,框架为了追求性能,采用了**对象池**技术(库存代码: a14f554e0085b6a179375a8ca04665434b73c7bd#SerializerUtils而Kryo在序列化和反序列化过程中只会使用固定的类加载器创建kryo的类对象Kryo.class的类加载器因此无法找到由OMS自定义类加载器创建的容器类。
* 解决方案弃用性能优异的对象池技术该用ThreadLocal + 手动设置Kryo类加载器。

View File

@ -1,100 +0,0 @@
## V1.0.0
#### 持久化链路
1. 客户端使用内存队列异步化批量上报服务器
2. 服务器接收到请求后无脑写H2数据库
3. 任务结束后流式同步到MongoDB持久化存储维护一个包含Array的MongoDB对象
4. 同步结束后删除本地所有数据
#### 查询链路
* 如果本地存在数据,则直接从本地数据库返回
* 如果本地不存在数据则直连MongoDB获取数据再返回
***
问题主要在于前台展示测试100W条数据本地H2占用82MMongoDB未知因为mongo shell不知道为啥用不了...),不过应该也小不到哪里去。这种情况下数据都没办法回传回来...需要更新方案。
```text
org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:351)
at org.apache.catalina.connector.OutputBuffer.flushByteBuffer(OutputBuffer.java:776)
at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:681)
at org.apache.catalina.connector.OutputBuffer.writeBytes(OutputBuffer.java:386)
at org.apache.catalina.connector.OutputBuffer.write(OutputBuffer.java:364)
at org.apache.catalina.connector.CoyoteOutputStream.write(CoyoteOutputStream.java:96)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1150)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:923)
at org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal(AbstractJackson2HttpMessageConverter.java:287)
at org.springframework.http.converter.AbstractGenericHttpMessageConverter.write(AbstractGenericHttpMessageConverter.java:104)
at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:287)
at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor.java:181)
at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:123)
at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403)
at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61)
at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:141)
at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:80)
at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1300)
at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1111)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1594)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138)
at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101)
at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740)
at org.apache.tomcat.util.net.SocketWrapperBase.writeBlocking(SocketWrapperBase.java:560)
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:504)
at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.doWrite(Http11OutputBuffer.java:538)
at org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java:110)
at org.apache.coyote.http11.Http11OutputBuffer.doWrite(Http11OutputBuffer.java:190)
at org.apache.coyote.Response.doWrite(Response.java:601)
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:339)
... 60 common frames omitted
```
## V2.0.0
>经过小小的调查mongoDB似乎允许用户直接使用它的文件系统:GridFS完成文件的存储。那么要不要改成文件对文件的形式呢同步开始时先在本地生成日志文件然后同步到MongoDB。查询时则先下载文件。一旦拥有了完整的文件分页什么的也就容易实现了前端展示一次1000行之类的

View File

@ -1,15 +1,17 @@
/*
Navicat Premium Data Transfer
Source Server : Local MySQL
Source Server Type : MySQL
Source Server Version : 80020
Source Schema : powerjob-product
Source Server Version : 80021
Source Host : localhost:3306
Source Schema : powerjob-daily
Target Server Type : MySQL
Target Server Version : 80020
Target Server Version : 80021
File Encoding : 65001
Date: 23/06/2020 22:30:06
Date: 08/10/2020 12:39:10
*/
SET NAMES utf8mb4;
@ -28,7 +30,7 @@ CREATE TABLE `app_info` (
`password` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `appNameUK` (`app_name`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for container_info
@ -62,10 +64,10 @@ CREATE TABLE `instance_info` (
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`instance_id` bigint DEFAULT NULL,
`instance_params` text,
`instance_params` longtext,
`job_id` bigint DEFAULT NULL,
`last_report_time` bigint DEFAULT NULL,
`result` text,
`result` longtext,
`running_times` bigint DEFAULT NULL,
`status` int DEFAULT NULL,
`task_tracker_address` varchar(255) DEFAULT NULL,
@ -75,7 +77,7 @@ CREATE TABLE `instance_info` (
KEY `IDX5b1nhpe5je7gc5s1ur200njr7` (`job_id`),
KEY `IDXjnji5lrr195kswk6f7mfhinrs` (`app_id`),
KEY `IDXa98hq3yu0l863wuotdjl7noum` (`instance_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for job_info
@ -101,7 +103,7 @@ CREATE TABLE `job_info` (
`min_memory_space` double NOT NULL,
`next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL,
`processor_info` text,
`processor_info` longtext,
`processor_type` int DEFAULT NULL,
`status` int DEFAULT NULL,
`task_retry_num` int DEFAULT NULL,
@ -109,7 +111,7 @@ CREATE TABLE `job_info` (
`time_expression_type` int DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDXk2xprmn3lldmlcb52i36udll1` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for oms_lock
@ -124,7 +126,7 @@ CREATE TABLE `oms_lock` (
`ownerip` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lockNameUK` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for server_info
@ -151,6 +153,8 @@ CREATE TABLE `user_info` (
`password` varchar(255) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`extra` varchar(255) DEFAULT NULL,
`web_hook` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
@ -166,7 +170,7 @@ CREATE TABLE `workflow_info` (
`max_wf_instance_num` int DEFAULT NULL,
`next_trigger_time` bigint DEFAULT NULL,
`notify_user_ids` varchar(255) DEFAULT NULL,
`pedag` text,
`pedag` longtext,
`status` int DEFAULT NULL,
`time_expression` varchar(255) DEFAULT NULL,
`time_expression_type` int DEFAULT NULL,
@ -174,7 +178,7 @@ CREATE TABLE `workflow_info` (
`wf_name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `IDX7uo5w0e3beeho3fnx9t7eiol3` (`app_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for workflow_instance_info
@ -184,15 +188,16 @@ CREATE TABLE `workflow_instance_info` (
`id` bigint NOT NULL AUTO_INCREMENT,
`actual_trigger_time` bigint DEFAULT NULL,
`app_id` bigint DEFAULT NULL,
`dag` text,
`dag` longtext,
`finished_time` bigint DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
`result` text,
`result` longtext,
`status` int DEFAULT NULL,
`wf_init_params` longtext,
`wf_instance_id` bigint DEFAULT NULL,
`workflow_id` bigint DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SET FOREIGN_KEY_CHECKS = 1;

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.2.3</powerjob.common.version>
<powerjob.common.version>3.3.0</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.client;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
@ -68,7 +68,7 @@ public class OhMyClient {
currentAddress = addr;
break;
}else {
throw new OmsException(resultDTO.getMessage());
throw new PowerJobException(resultDTO.getMessage());
}
}
}catch (IOException ignore) {
@ -76,7 +76,7 @@ public class OhMyClient {
}
if (StringUtils.isEmpty(currentAddress)) {
throw new OmsException("no server available");
throw new PowerJobException("no server available");
}
log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
}
@ -108,7 +108,7 @@ public class OhMyClient {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(json, jsonType));
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -283,7 +283,7 @@ public class OhMyClient {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType));
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class);
}
@ -349,17 +349,26 @@ public class OhMyClient {
/**
* 运行工作流
* @param workflowId workflowId
* @param workflowId 工作流ID
* @param initParams 启动参数
* @param delayMS 延迟时间单位毫秒 ms
* @return 工作流实例ID
* @throws Exception 异常
* @throws Exception 异常信息
*/
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception {
public ResultDTO<Long> runWorkflow(Long workflowId, String initParams, long delayMS) throws Exception {
FormBody.Builder builder = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString());
.add("appId", appId.toString())
.add("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(initParams)) {
builder.add("initParams", initParams);
}
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
return JsonUtils.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception {
return runWorkflow(workflowId, null, 0);
}
/* ************* Workflow Instance 区 ************* */
/**
@ -426,6 +435,6 @@ public class OhMyClient {
}
log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available when send post");
throw new PowerJobException("no server available when send post");
}
}

View File

@ -1,7 +1,11 @@
import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -20,7 +24,23 @@ public class TestWorkflow {
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null);
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test
public void initTestData() throws Exception {
SaveJobInfoRequest base = new SaveJobInfoRequest();
base.setJobName("DAG-Node-");
base.setTimeExpressionType(TimeExpressionType.WORKFLOW);
base.setExecuteType(ExecuteType.STANDALONE);
base.setProcessorType(ProcessorType.EMBEDDED_JAVA);
base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor");
for (int i = 0; i < 5; i++) {
SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class);
request.setJobName(request.getJobName() + i);
System.out.println(ohMyClient.saveJob(request));
}
}
@Test
@ -30,8 +50,8 @@ public class TestWorkflow {
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
nodes.add(new PEWorkflowDAG.Node(1L, "node-1"));
nodes.add(new PEWorkflowDAG.Node(2L, "node-2"));
nodes.add(new PEWorkflowDAG.Node(1L, "DAG-Node-1"));
nodes.add(new PEWorkflowDAG.Node(2L, "DAG-Node-2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
@ -81,4 +101,9 @@ public class TestWorkflow {
public void testFetchWfInstanceInfo() throws Exception {
System.out.println(ohMyClient.fetchWorkflowInstanceInfo(149962433421639744L));
}
@Test
public void testRunWorkflowPlus() throws Exception {
System.out.println(ohMyClient.runWorkflow(1L, "this is init Params 2", 90000));
}
}

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<properties>
@ -18,7 +18,7 @@
<commons.lang.version>3.10</commons.lang.version>
<commons.io.version>2.6</commons.io.version>
<guava.version>29.0-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version>
<okhttp.version>3.14.9</okhttp.version>
<akka.version>2.6.4</akka.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -1,29 +0,0 @@
package com.github.kfcfans.powerjob.common;
/**
* OhMyScheduler 运行时异常
*
* @author tjq
* @since 2020/5/26
*/
public class OmsException extends RuntimeException {
public OmsException() {
}
public OmsException(String message) {
super(message);
}
public OmsException(String message, Throwable cause) {
super(message, cause);
}
public OmsException(Throwable cause) {
super(cause);
}
public OmsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.powerjob.common;
/**
* PowerJob 运行时异常
*
* @author tjq
* @since 2020/5/26
*/
public class PowerJobException extends RuntimeException {
public PowerJobException() {
}
public PowerJobException(String message) {
super(message);
}
public PowerJobException(String message, Throwable cause) {
super(message, cause);
}
public PowerJobException(Throwable cause) {
super(cause);
}
public PowerJobException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -20,12 +20,16 @@ public class InstanceInfoDTO {
private Long appId;
// 任务实例ID
private Long instanceId;
// 工作流实例ID
private Long wfInstanceId;
// 任务实例参数
private String instanceParams;
/**
* 任务状态 {@link InstanceStatus}
*/
private int status;
// 该任务实例的类型普通/工作流InstanceType
private Integer type;
// 执行结果
private String result;
// 预计触发时间

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.common.utils;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
@ -121,11 +121,11 @@ public class CommonUtils {
public static <T> T requireNonNull(T obj, String msg) {
if (obj == null) {
throw new OmsException(msg);
throw new PowerJobException(msg);
}
if (obj instanceof String) {
if (StringUtils.isEmpty((String) obj)) {
throw new OmsException(msg);
throw new PowerJobException(msg);
}
}
return obj;

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
@ -54,6 +54,6 @@ public class JsonUtils {
}catch (Exception e) {
ExceptionUtils.rethrow(e);
}
throw new OmsException("impossible");
throw new PowerJobException("impossible");
}
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.3</powerjob.common.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.3.0</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
@ -57,6 +57,11 @@
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.database.nls</groupId>
<artifactId>orai18n</artifactId>
<version>${ojdbc.version}</version>
</dependency>
<!-- sqlserver -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>

View File

@ -27,8 +27,8 @@ public class ThreadPoolConfig {
@Bean("omsTimingPool")
public Executor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 16);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 32);
// use SynchronousQueue
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
@ -44,8 +44,8 @@ public class ThreadPoolConfig {
@Bean("omsBackgroundPool")
public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16);
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");

View File

@ -7,7 +7,7 @@ import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request;
import com.dingtalk.api.request.OapiUserGetByMobileRequest;
import com.dingtalk.api.response.OapiGettokenResponse;
import com.dingtalk.api.response.OapiUserGetByMobileResponse;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -52,7 +52,7 @@ public class DingTalkUtils implements Closeable {
refreshAccessToken(appKey, appSecret);
if (StringUtils.isEmpty(accessToken)) {
throw new OmsException("fetch AccessToken failed, please check your appKey & appSecret");
throw new PowerJobException("fetch AccessToken failed, please check your appKey & appSecret");
}
scheduledPool = Executors.newSingleThreadScheduledExecutor();
@ -91,7 +91,7 @@ public class DingTalkUtils implements Closeable {
return execute.getUserid();
}
log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg());
throw new OmsException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
throw new PowerJobException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
}
public void sendMarkdownAsync(String title, List<MarkdownEntity> entities, String userList, Long agentId) throws Exception {

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.model.WorkflowDAG;
@ -76,7 +76,7 @@ public class WorkflowDAGUtils {
Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap();
if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) {
throw new OmsException("empty graph");
throw new PowerJobException("empty graph");
}
// 创建节点
@ -95,7 +95,7 @@ public class WorkflowDAGUtils {
WorkflowDAG.Node to = id2Node.get(edge.getTo());
if (from == null || to == null) {
throw new OmsException("Illegal Edge: " + JsonUtils.toJSONString(edge));
throw new PowerJobException("Illegal Edge: " + JsonUtils.toJSONString(edge));
}
from.getSuccessors().add(to);
@ -106,7 +106,7 @@ public class WorkflowDAGUtils {
// 合法性校验至少存在一个顶点
if (rootIds.size() < 1) {
throw new OmsException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG));
throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG));
}
List<WorkflowDAG.Node> roots = Lists.newLinkedList();

View File

@ -29,6 +29,11 @@ public class UserInfoDO {
private String phone;
// 邮箱地址
private String email;
// webHook
private String webHook;
// 扩展字段
private String extra;
private Date gmtCreate;
private Date gmtModified;

View File

@ -36,6 +36,11 @@ public class WorkflowInstanceInfoDO {
// workflow 状态WorkflowInstanceStatus
private Integer status;
// 工作流启动参数
@Lob
@Column
private String wfInitParams;
@Lob
@Column
private String dag;

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import org.springframework.stereotype.Service;
@ -28,11 +28,11 @@ public class AppInfoService {
*/
public Long assertApp(String appName, String password) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName));
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId();
}
throw new OmsException("password error!");
throw new PowerJobException("password error!");
}
}

View File

@ -193,7 +193,7 @@ public class InstanceLogService {
try {
instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
log.warn("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
}catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
@ -78,7 +79,7 @@ public class JobService {
jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds()));
}
refreshJob(jobInfoDO);
calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date());
}
@ -143,7 +144,7 @@ public class JobService {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
refreshJob(jobInfoDO);
calculateNextTriggerTime(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO);
}
@ -184,7 +185,7 @@ public class JobService {
});
}
private void refreshJob(JobInfoDO jobInfoDO) throws Exception {
private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception {
// 计算下次调度时间
Date now = new Date();
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
@ -192,6 +193,9 @@ public class JobService {
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
if (nextValidTime == null) {
throw new PowerJobException("cron expression is out of date: " + jobInfoDO.getTimeExpression());
}
jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) {
jobInfoDO.setTimeExpression(null);

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.service.alarm.impl;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ;
@ -55,7 +55,7 @@ public class DingTalkAlarmService implements Alarmable {
String userId = mobile2UserIdCache.get(user.getPhone(), () -> {
try {
return dingTalkUtils.fetchUserIdByMobile(user.getPhone());
} catch (OmsException ignore) {
} catch (PowerJobException ignore) {
return EMPTY_TAG;
} catch (Exception ignore) {
return null;

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.ha;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.Ping;
@ -56,7 +56,7 @@ public class ServerSelectService {
// 无锁获取当前数据库中的Server
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
throw new OmsException(appId + " is not registered!");
throw new PowerJobException(appId + " is not registered!");
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
@ -135,11 +135,11 @@ public class InstanceService {
public void retryInstance(Long instanceId) {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) {
throw new OmsException("Only stopped instance can be retry!");
throw new PowerJobException("Only stopped instance can be retry!");
}
// 暂时不支持工作流任务的重试
if (instanceInfo.getWfInstanceId() != null) {
throw new OmsException("Workflow's instance do not support retry!");
throw new PowerJobException("Workflow's instance do not support retry!");
}
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
@ -152,7 +152,7 @@ public class InstanceService {
// 派发任务
Long jobId = instanceInfo.getJobId();
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new OmsException("can't find job info by jobId: " + jobId));
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}
@ -187,7 +187,7 @@ public class InstanceService {
log.info("[Instance-{}] cancel the instance successfully.", instanceId);
}else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId);
throw new OmsException("instance already up and running");
throw new PowerJobException("instance already up and running");
}
}catch (Exception e) {

View File

@ -171,7 +171,7 @@ public class InstanceStatusCheckService {
waitingWfInstanceList.forEach(wfInstance -> {
Optional<WorkflowInfoDO> workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId());
workflowOpt.ifPresent(workflowInfo -> {
workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId());
workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams());
log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId());
});
});

View File

@ -121,8 +121,6 @@ public class OmsScheduleService {
*/
private void scheduleCronJob(List<Long> appIds) {
Date now = new Date();
long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
@ -165,24 +163,13 @@ public class OmsScheduleService {
});
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
List<JobInfoDO> updatedJobInfos = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> {
try {
Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
updatedJobInfo.setGmtModified(now);
updatedJobInfos.add(updatedJobInfo);
refreshJob(jobInfoDO);
} catch (Exception e) {
log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e);
log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
}
});
jobInfoRepository.saveAll(updatedJobInfos);
jobInfoRepository.flush();
@ -203,11 +190,10 @@ public class OmsScheduleService {
return;
}
Date now = new Date();
wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录防止不调度的情况发生
Long wfInstanceId = workflowInstanceManager.create(wfInfo);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
// 2. 推入时间轮准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();
@ -215,20 +201,13 @@ public class OmsScheduleService {
log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis());
delay = 0;
}
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null));
// 3. 重新计算下一次调度时间并更新
try {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
updateEntity.setGmtModified(now);
workflowInfoRepository.save(updateEntity);
refreshWorkflow(wfInfo);
}catch (Exception e) {
log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e);
log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e);
}
});
workflowInfoRepository.flush();
@ -264,6 +243,40 @@ public class OmsScheduleService {
});
}
private void refreshJob(JobInfoDO jobInfo) throws Exception {
Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfo, updatedJobInfo);
if (nextTriggerTime == null) {
log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId());
updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV());
}else {
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
}
updatedJobInfo.setGmtModified(new Date());
jobInfoRepository.save(updatedJobInfo);
}
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
if (nextTriggerTime == null) {
log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId());
wfInfo.setStatus(SwitchableStatus.DISABLE.getV());
}else {
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
}
updateEntity.setGmtModified(new Date());
workflowInfoRepository.save(updateEntity);
}
/**
* 计算下次触发时间
* @param preTriggerTime 前一次触发时间

View File

@ -67,9 +67,10 @@ public class WorkflowInstanceManager {
/**
* 创建工作流任务实例
* @param wfInfo 工作流任务元数据描述信息
* @param initParams 启动参数
* @return wfInstanceId
*/
public Long create(WorkflowInfoDO wfInfo) {
public Long create(WorkflowInfoDO wfInfo, String initParams) {
Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate();
@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
newWfInstance.setWorkflowId(wfId);
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
newWfInstance.setWfInitParams(initParams);
newWfInstance.setGmtCreate(now);
newWfInstance.setGmtModified(now);
@ -107,8 +109,9 @@ public class WorkflowInstanceManager {
* 开始任务
* @param wfInfo 工作流任务信息
* @param wfInstanceId 工作流任务实例ID
* @param initParams 启动参数
*/
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) {
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
if (!wfInstanceInfoOpt.isPresent()) {
@ -132,13 +135,19 @@ public class WorkflowInstanceManager {
try {
// 构建根任务启动参数为了精简 worker 端实现启动参数仍以 instanceParams 字段承接
Map<String, String> preJobId2Result = Maps.newHashMap();
// 模拟 preJobId -> preJobResult 的格式-1 代表前置任务不存在
preJobId2Result.put("-1", initParams);
String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result);
PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG);
peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()));
// 创建所有的根任务
roots.forEach(root -> {
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), null, wfInstanceId, System.currentTimeMillis());
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis());
root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV());
@ -152,7 +161,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务
roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null));
roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams));
}catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
@ -43,7 +43,7 @@ public class WorkflowInstanceService {
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
throw new OmsException("workflow instance already stopped");
throw new PowerJobException("workflow instance already stopped");
}
// 停止所有已启动且未完成的服务
PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
@ -80,7 +80,7 @@ public class WorkflowInstanceService {
public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId));
if (!Objects.equals(appId, wfInstance.getAppId())) {
throw new OmsException("Permission Denied!");
throw new PowerJobException("Permission Denied!");
}
return wfInstance;
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO;
@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -42,7 +43,7 @@ public class WorkflowService {
req.valid();
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
throw new OmsException("illegal DAG");
throw new PowerJobException("illegal DAG");
}
Long wfId = req.getId();
@ -130,22 +131,27 @@ public class WorkflowService {
* 立即运行工作流
* @param wfId 工作流ID
* @param appId 所属应用ID
* @param initParams 启动参数
* @param delay 延迟时间
* @return workflow 实例的 instanceIdwfInstanceId
*/
public Long runWorkflow(Long wfId, Long appId) {
public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
Long wfInstanceId = workflowInstanceManager.create(wfInfo);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
// 正式启动任务
workflowInstanceManager.start(wfInfo, wfInstanceId);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams));
}
return wfInstanceId;
}
private WorkflowInfoDO permissionCheck(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId));
if (!wfInfo.getAppId().equals(appId)) {
throw new OmsException("Permission Denied!can't delete other appId's workflow!");
throw new PowerJobException("Permission Denied!can't delete other appId's workflow!");
}
return wfInfo;
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.web;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.HttpMessageNotReadableException;
@ -25,7 +25,7 @@ public class ControllerExceptionHandler {
public ResultDTO<Void> exceptionHandler(Exception e) {
// 不是所有异常都需要打印完整堆栈后续可以定义内部的Exception便于判断
if (e instanceof IllegalArgumentException || e instanceof OmsException) {
if (e instanceof IllegalArgumentException || e instanceof PowerJobException) {
log.warn("[ControllerException] http request failed, message is {}.", e.getMessage());
} else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) {
log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage());

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
@ -145,10 +146,10 @@ public class InstanceController {
private String getTargetServer(Long instanceId) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) {
throw new RuntimeException("invalid instanceId: " + instanceId);
throw new PowerJobException("invalid instanceId: " + instanceId);
}
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId());
return appInfoOpt.orElseThrow(() -> new RuntimeException("impossible")).getCurrentServer();
return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer();
}
}

View File

@ -150,8 +150,8 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.RUN_WORKFLOW)
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId, @RequestParam(required = false) String initParams, @RequestParam(required = false) Long delay) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay == null ? 0 : delay));
}
/* ************* Workflow Instance 区 ************* */

View File

@ -79,7 +79,7 @@ public class WorkflowController {
@GetMapping("/run")
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0));
}
private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) {

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.server.web.request;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@ -21,7 +21,7 @@ public class ModifyAppInfoRequest {
public void valid() {
CommonUtils.requireNonNull(appName, "appName can't be empty");
if (StringUtils.containsWhitespace(appName)) {
throw new OmsException("appName can't contains white space!");
throw new PowerJobException("appName can't contains white space!");
}
}
}

View File

@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com

View File

@ -8,6 +8,7 @@ ${AnsiColor.GREEN}
░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████
░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░
${AnsiColor.BRIGHT_RED}
* Maintainer: tengjiqi@gmail.com
* Maintainer: tengjiqi@gmail.com & PowerJob-Team
* OfficialWebsite: http://www.powerjob.tech/
* SourceCode: https://github.com/KFCFans/PowerJob
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)

View File

@ -8,6 +8,13 @@
-->
<property name="LOG_PATH" value="${user.home}/powerjob-server/logs"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 系统所有异常日志ERROR双写 start -->
<appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/powerjob-server-error.log</file>
@ -61,6 +68,7 @@
<!-- 系统主日志 日志 end -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEFAULT_APPENDER"/>
<appender-ref ref="ERROR_APPENDER"/>
</root>

View File

@ -0,0 +1,26 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import org.junit.Test;
import java.util.Date;
/**
* CRON 测试
*
* @author tjq
* @since 2020/10/8
*/
public class CronTest {
private static final String FIXED_CRON = "0 0 13 8 10 ? 2020-2020";
@Test
public void testFixedTimeCron() throws Exception {
CronExpression cronExpression = new CronExpression(FIXED_CRON);
System.out.println(cronExpression.getCronExpression());
System.out.println(cronExpression.getNextValidTimeAfter(new Date()));
}
}

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.3</powerjob.worker.version>
<powerjob.worker.version>3.3.0</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -29,7 +29,7 @@
<file>${LOG_PATH}/powerjob-agent-error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH}/powerjob-agent-error.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>7</MaxHistory>
<MaxHistory>3</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
@ -49,7 +49,7 @@
<file>${LOG_PATH}/powerjob-agent-application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH}/powerjob-agent-application.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>7</MaxHistory>
<MaxHistory>3</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.2.3</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.3.0</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -36,6 +36,7 @@ public class BroadcastProcessorDemo extends BroadcastProcessor {
public ProcessResult process(TaskContext taskContext) throws Exception {
System.out.println("===== BroadcastProcessorDemo#process ======");
taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
Thread.sleep(45 * 1000);
return new ProcessResult(true);
}

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.3</powerjob.worker.version>
<powerjob.worker.version>3.3.0</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.2.3</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.3</powerjob.common.version>
<powerjob.common.version>3.3.0</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -5,7 +5,7 @@ import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
@ -163,16 +163,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
return appId;
}else {
log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new OmsException(resultDTO.getMessage());
throw new PowerJobException(resultDTO.getMessage());
}
}catch (OmsException oe) {
}catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
throw new OmsException("no server available!");
throw new PowerJobException("no server available!");
}
@Override

View File

@ -65,7 +65,7 @@ public class TaskTrackerActor extends AbstractActor {
taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
}
taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
}
/**

View File

@ -11,21 +11,28 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class OmsBannerPrinter {
private static final String BANNER = "\n ███████ ██ ██ \n" +
"░██░░░░██ ░██ ░██ \n" +
"░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██ \n" +
"░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████ \n" +
private static final String BANNER = "" +
"\n" +
" ███████ ██ ██\n" +
"░██░░░░██ ░██ ░██\n" +
"░██ ░██ ██████ ███ ██ █████ ██████ ░██ ██████ ░██\n" +
"░███████ ██░░░░██░░██ █ ░██ ██░░░██░░██░░█ ░██ ██░░░░██░██████\n" +
"░██░░░░ ░██ ░██ ░██ ███░██░███████ ░██ ░ ░██░██ ░██░██░░░██\n" +
"░██ ░██ ░██ ░████░████░██░░░░ ░██ ██ ░██░██ ░██░██ ░██\n" +
"░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████ \n" +
"░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ \n";
"░██ ░░██████ ███░ ░░░██░░██████░███ ░░█████ ░░██████ ░██████\n" +
"░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░\n" +
"\n" +
"* Maintainer: tengjiqi@gmail.com & PowerJob-Team\n" +
"* OfficialWebsite: http://www.powerjob.tech/\n" +
"* SourceCode: https://github.com/KFCFans/PowerJob\n" +
"\n";
public static void print() {
log.info(BANNER);
String version = OmsWorkerVersion.getVersion();
version = (version != null) ? " (v" + version + ")" : "";
log.info(":: OhMyScheduler Worker :: {}", version);
log.info(":: PowerJob Worker :: {}", version);
}
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.worker.container;
import com.github.kfcfans.powerjob.common.ContainerConstant;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
@ -106,7 +106,7 @@ public class OmsJarContainer implements OmsContainer {
if (propertiesURLStream == null) {
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
throw new OmsException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
}
properties.load(propertiesURLStream);
@ -115,7 +115,7 @@ public class OmsJarContainer implements OmsContainer {
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
if (StringUtils.isEmpty(packageName)) {
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
throw new OmsException("invalid jar file");
throw new PowerJobException("invalid jar file");
}
// 加载用户类

View File

@ -63,7 +63,7 @@ public class ProcessorTracker {
private ThreadPoolExecutor threadPool;
private ScheduledExecutorService timingPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
// 长时间空闲的 ProcessorTracker 会发起销毁请求
private static final long MAX_IDLE_TIME = 300000;
@ -318,12 +318,12 @@ public class ProcessorTracker {
break;
default:
log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType);
throw new OmsException("unknown processor type of " + processorType);
throw new PowerJobException("unknown processor type of " + processorType);
}
if (processor == null) {
log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
throw new OmsException("fetch Processor failed");
throw new PowerJobException("fetch Processor failed");
}
}

View File

@ -110,7 +110,7 @@ public class CommonTaskTracker extends TaskTracker {
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
}else {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
throw new OmsException("create root task failed for instance: " + instanceId);
throw new PowerJobException("create root task failed for instance: " + instanceId);
}
}

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -91,7 +92,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
// 固定频率需要设置最小间隔
if (timeParams < MIN_INTERVAL) {
throw new OmsException("time interval too small, please set the timeExpressionInfo >= 1000");
throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000");
}
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
}else {
@ -123,6 +124,9 @@ public class FrequentTaskTracker extends TaskTracker {
history.add(subDetail);
});
// subInstanceId 排序 issue#63
history.sort((o1, o2) -> (int) (o2.getSubInstanceId() - o1.getSubInstanceId()));
detail.setSubInstanceDetails(history);
return detail;
}
@ -141,15 +145,10 @@ public class FrequentTaskTracker extends TaskTracker {
// 子任务实例ID
Long subInstanceId = triggerTimes.incrementAndGet();
// 记录时间
SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder();
timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis();
subInstanceId2TimeHolder.put(subInstanceId, timeHolder);
// 执行记录缓存
// 执行记录缓存只做展示因此可以放在前面
SubInstanceInfo subInstanceInfo = new SubInstanceInfo();
subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue();
subInstanceInfo.startTime = timeHolder.startTime;
subInstanceInfo.startTime = System.currentTimeMillis();
recentSubInstanceInfo.put(subInstanceId, subInstanceInfo);
String myAddress = OhMyWorker.getWorkerAddress();
@ -174,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker {
if (maxInstanceNum > 0) {
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
if (subInstanceId2TimeHolder.size() > maxInstanceNum) {
log.warn("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE");
return;
}
@ -183,11 +182,16 @@ public class FrequentTaskTracker extends TaskTracker {
// 必须先持久化持久化成功才能 dispatch否则会导致后续报错因为DB中没有这个taskId对应的记录会各种报错
if (!taskPersistenceService.save(newRootTask)) {
log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId);
log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId);
processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED");
return;
}
// 生成记录信息必须保证持久化成功才能生成该记录否则会导致 LAUNCH_FAILED 错误
SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder();
timeHolder.startTime = System.currentTimeMillis();
subInstanceId2TimeHolder.put(subInstanceId, timeHolder);
dispatchTask(newRootTask, myAddress);
}
@ -196,7 +200,7 @@ public class FrequentTaskTracker extends TaskTracker {
try {
innerRun();
}catch (Exception e) {
log.error("[TaskTracker-{}] launch task failed.", instanceId, e);
log.error("[FQTaskTracker-{}] launch task failed.", instanceId, e);
}
}
}
@ -206,8 +210,6 @@ public class FrequentTaskTracker extends TaskTracker {
*/
private class Checker implements Runnable {
private static final long HEARTBEAT_TIMEOUT_MS = 60000;
@Override
public void run() {
@ -219,7 +221,7 @@ public class FrequentTaskTracker extends TaskTracker {
checkStatus();
reportStatus();
}catch (Exception e) {
log.warn("[TaskTracker-{}] check and report status failed.", instanceId, e);
log.warn("[FQTaskTracker-{}] check and report status failed.", instanceId, e);
}
}
@ -237,12 +239,10 @@ public class FrequentTaskTracker extends TaskTracker {
SubInstanceTimeHolder timeHolder = entry.getValue();
long executeTimeout = nowTS - timeHolder.startTime;
long heartbeatTimeout = nowTS - timeHolder.lastActiveTime;
// 超时包含总运行时间超时和心跳包超时直接判定为失败
if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
onFinished(subInstanceId, false, "TIMEOUT", iterator);
if (executeTimeout > instanceTimeoutMS) {
onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator);
continue;
}
@ -292,14 +292,11 @@ public class FrequentTaskTracker extends TaskTracker {
newLastTask.setAddress(OhMyWorker.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask));
}
}
}
// 舍去一切重试机制反正超时就失败
log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch.stop());
}
log.debug("[FQTaskTracker-{}] check status using {}.", instanceId, stopwatch);
}
private void reportStatus() {
@ -374,7 +371,6 @@ public class FrequentTaskTracker extends TaskTracker {
private static class SubInstanceTimeHolder {
private long startTime;
private long lastActiveTime;
}
}

View File

@ -138,12 +138,13 @@ public abstract class TaskTracker {
/**
* 更新Task状态
* V1.0.0 -> V1.0.1e405e283ad7f97b0b4e5d369c7de884c0caf9192 锁方案变更 synchronized (taskId.intern()) 修改为分段锁能大大减少内存占用损失的只有理论并发度而已
* @param subInstanceId 子任务实例ID
* @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态
* @param reportTime 上报时间
* @param result task的执行结果未执行完成时为空
*/
public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
return;
@ -165,7 +166,7 @@ public abstract class TaskTracker {
lastReportTime = taskOpt.get().getLastReportTime();
}else {
// 理论上不存在这种情况除非数据库异常
log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", instanceId, instanceId, taskId);
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
}
if (lastReportTime == null) {
@ -175,8 +176,8 @@ public abstract class TaskTracker {
// 过滤过期的请求潜在的集群时间一致性需求重试跨Worker时时间不一致可能导致问题
if (lastReportTime > reportTime) {
log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, lastReportTime, reportTime, taskId, newStatus);
log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus);
return;
}
@ -214,7 +215,7 @@ public abstract class TaskTracker {
boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (retryTask) {
log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId);
log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
return;
}
}
@ -226,12 +227,12 @@ public abstract class TaskTracker {
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
}
} catch (InterruptedException ignore) {
} catch (Exception e) {
log.warn("[TaskTracker-{}] update task status failed.", instanceId, e);
log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
} finally {
segmentLock.unlock(lockId);
}
@ -278,7 +279,7 @@ public abstract class TaskTracker {
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
}
}
}