diff --git a/others/logs/BasicFunctionTestRecord.md b/others/logs/BasicFunctionTestRecord.md deleted file mode 100644 index 4ffb171a..00000000 --- a/others/logs/BasicFunctionTestRecord.md +++ /dev/null @@ -1,105 +0,0 @@ -# 2020.4.8 第一轮测试 -## 测试用例 -* MapReduce任务:http://localhost:7700/job/save?appId=1&concurrency=5&executeType=MAP_REDUCE&groupName=null&instanceRetryNum=3&instanceTimeLimit=4545454545&jobDescription=jobDescription&jobName=testJob&jobParams=%7B%22a%22%3A%22b%22%7D&maxInstanceNum=1&processorInfo=com.github.kfcfans.powerjob.processors.TestMapReduceProcessor&processorType=EMBEDDED_JAVA&status=1&taskRetryNum=3&taskTimeLimit=564465656&timeExpression=0%20*%20*%20*%20*%20%3F%20&timeExpressionType=CRON - -## 问题记录 -#### 任务执行成功,释放资源失败 -第一个任务执行完成后,释放资源阶段(删除本地H2数据库中所有记录)报错,堆栈如下: -```text -2020-04-08 10:09:19 INFO - [ProcessorTracker-1586311659084] mission complete, ProcessorTracker already destroyed! -2020-04-08 10:09:19 ERROR - [TaskPersistenceService] deleteAllTasks failed, instanceId=1586311659084. -java.lang.InterruptedException: sleep interrupted - at java.lang.Thread.sleep(Native Method) - at CommonUtils.executeWithRetry(CommonUtils.java:34) - at TaskPersistenceService.execute(TaskPersistenceService.java:297) - at TaskPersistenceService.deleteAllTasks(TaskPersistenceService.java:269) - at CommonTaskTracker.destroy(TaskTracker.java:231) - at CommonTaskTracker$StatusCheckRunnable.innerRun(TaskTracker.java:421) - at CommonTaskTracker$StatusCheckRunnable.run(TaskTracker.java:467) - at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) - at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at java.lang.Thread.run(Thread.java:748) -2020-04-08 10:09:19 WARN - [TaskTracker-1586311659084] delete tasks from database failed. -2020-04-08 10:09:19 INFO - [TaskTracker-1586311659084] TaskTracker has left the world. -``` -随后,Server派发下来的第二个任务也无法完成创建,异常堆栈如下: -```text -2020-04-08 10:10:08 ERROR - [TaskPersistenceService] save taskTaskDO{taskId='0', jobId='1', instanceId='1586311804030', taskName='OMS_ROOT_TASK', address='10.37.129.2:2777', status=1, result='null', failedCnt=0, createdTime=1586311808295, lastModifiedTime=1586311808295} failed. -2020-04-08 10:10:08 ERROR - [TaskTracker-1586311804030] create root task failed. -[ERROR] [04/08/2020 10:10:08.511] [oms-akka.actor.internal-dispatcher-20] [akka://oms/user/task_tracker] create root task failed. -java.lang.RuntimeException: create root task failed. - at CommonTaskTracker.persistenceRootTask(TaskTracker.java:208) - at CommonTaskTracker.(TaskTracker.java:81) - at TaskTrackerActor.lambda$onReceiveServerScheduleJobReq$2(TaskTrackerActor.java:138) - at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) - at TaskTrackerPool.atomicCreateTaskTracker(TaskTrackerPool.java:30) - at TaskTrackerActor.onReceiveServerScheduleJobReq(TaskTrackerActor.java:138) -``` -*** -原因及解决方案:destroy方法调用了scheduledPool.shutdownNow()方法导致调用该方法的线程池被强制关闭,该方法也自然被中断,数据删到一半没删掉,破坏了数据库结构,后面的insert自然也就失败了。 - -# 2020.4.11 "集群"测试 -#### 任务重试机制失效 -原因:SQL中的now()函数返回的是Datetime,不能用ing/bigint去接收... - -#### SystemMetric算分问题 -问题:java.lang.management.OperatingSystemMXBean#getSystemLoadAverage 不一定能获取CPU当前负载,可能返回负数代表不可用... -解决方案:印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧... - -#### 未知的数组越界问题(可能是数据库性能问题) -问题:秒级Broadcast任务在第四次执行时,当Processor完成执行上报状态时,TaskTracker报错,错误的本质原因是无法从数据库中找到这个task对应的记录... -场景:时间表达式:FIX_DELAY,对应的TaskTracker为FrequentTaskTracker - -异常堆栈 -```text -2020-04-16 18:05:09 ERROR - [TaskPersistenceService] getTaskStatus failed, instanceId=1586857062542,taskId=4. -java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 - at java.util.LinkedList.checkElementIndex(LinkedList.java:555) - at java.util.LinkedList.get(LinkedList.java:476) - at TaskPersistenceService.lambda$getTaskStatus$10(TaskPersistenceService.java:214) - at CommonUtils.executeWithRetry(CommonUtils.java:37) - at TaskPersistenceService.execute(TaskPersistenceService.java:310) - at TaskPersistenceService.getTaskStatus(TaskPersistenceService.java:212) - at TaskTracker.updateTaskStatus(TaskTracker.java:107) - at TaskTracker.broadcast(TaskTracker.java:214) - at TaskTrackerActor.onReceiveBroadcastTaskPreExecuteFinishedReq(TaskTrackerActor.java:106) - at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) - at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) - at scala.PartialFunction.applyOrElse(PartialFunction.scala:187) - at scala.PartialFunction.applyOrElse$(PartialFunction.scala:186) - at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:241) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242) - at akka.actor.Actor.aroundReceive(Actor.scala:534) - at akka.actor.Actor.aroundReceive$(Actor.scala:532) - at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) - at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573) - at akka.actor.ActorCell.invoke(ActorCell.scala:543) - at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269) - at akka.dispatch.Mailbox.run(Mailbox.scala:230) - at akka.dispatch.Mailbox.exec(Mailbox.scala:242) - at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) - at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) - at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) - at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) -2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6). -``` -解决方案:初步怀疑在连续更改时,由于数据库锁的存在导致行不可见(不知道H2具体的特性)。因此,需要保证同一个taskId串行更新 -> synchronize Yes! - -# 2020.4.20 1.0.0发布前测试 -#### Server & Worker -* 指定机器执行 -> 验证通过 -* Map/MapReduce/Standalone/Broadcast/Shell/Python处理器的执行 -> 验证通过 -* 超时失败 -> 验证通过 -* 破坏测试:指定错误的处理器 -> 发现问题,会造成死锁(TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T)。通过确保ProcessorTracker一定能创建成功解决,如果处理器构建失败,之后所有提交的任务直接返回错误。 -#### Client -* StopInstance -> success -* FetchInstanceStatus -> success - diff --git a/others/logs/ContainerTestRecord.md b/others/logs/ContainerTestRecord.md deleted file mode 100644 index 7a0d3326..00000000 --- a/others/logs/ContainerTestRecord.md +++ /dev/null @@ -1,34 +0,0 @@ -# 容器测试日志 -## ClassNotFound问题 ->玩热加载这一套,不来几个ClassNotFound都没那味 [滑稽]~ - -测试容器化的MapReduce任务时,发现如下错误: -```text -2020-05-19 09:33:18 ERROR - [ProcessorRunnable-142925055284740224] execute failed, please fix this bug @tjq! -com.esotericsoftware.kryo.KryoException: Unable to find class: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask - at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:182) - at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:151) - at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:684) - at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:795) - at SerializerUtils.deSerialized(SerializerUtils.java:48) - at ProcessorRunnable.innerRun(ProcessorRunnable.java:63) - at ProcessorRunnable.run(ProcessorRunnable.java:179) - at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) - at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) - at java.util.concurrent.FutureTask.run(FutureTask.java) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at java.lang.Thread.run(Thread.java:748) -Caused by: java.lang.ClassNotFoundException: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask - at java.net.URLClassLoader.findClass(URLClassLoader.java:382) - at java.lang.ClassLoader.loadClass(ClassLoader.java:418) - at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) - at java.lang.ClassLoader.loadClass(ClassLoader.java:351) - at java.lang.Class.forName0(Native Method) - at java.lang.Class.forName(Class.java:348) - at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:176) - ... 12 common frames omitted -``` - -* 原因分析:经过分析,原有在于序列化与反序列化过程中,框架为了追求性能,采用了**对象池**技术(库存代码: a14f554e0085b6a179375a8ca04665434b73c7bd#SerializerUtils),而Kryo在序列化和反序列化过程中只会使用固定的类加载器(创建kryo的类对象(Kryo.class)的类加载器),因此无法找到由OMS自定义类加载器创建的容器类。 -* 解决方案:弃用性能优异的对象池技术,该用ThreadLocal + 手动设置Kryo类加载器。 \ No newline at end of file diff --git a/others/logs/OnlineLogTestRecord.md b/others/logs/OnlineLogTestRecord.md deleted file mode 100644 index 7ce8eb94..00000000 --- a/others/logs/OnlineLogTestRecord.md +++ /dev/null @@ -1,100 +0,0 @@ -## V1.0.0 -#### 持久化链路 -1. 客户端使用内存队列异步化批量上报服务器 -2. 服务器接收到请求后无脑写H2数据库 -3. 任务结束后,流式同步到MongoDB持久化存储,维护一个包含Array的MongoDB对象 -4. 同步结束后删除本地所有数据 -#### 查询链路 -* 如果本地存在数据,则直接从本地数据库返回 -* 如果本地不存在数据,则直连MongoDB,获取数据再返回 - -*** -问题主要在于前台展示:测试100W条数据,本地H2占用82M,MongoDB未知(因为mongo shell不知道为啥用不了...),不过应该也小不到哪里去。这种情况下数据都没办法回传回来...需要更新方案。 - -```text -org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe - at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:351) - at org.apache.catalina.connector.OutputBuffer.flushByteBuffer(OutputBuffer.java:776) - at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:681) - at org.apache.catalina.connector.OutputBuffer.writeBytes(OutputBuffer.java:386) - at org.apache.catalina.connector.OutputBuffer.write(OutputBuffer.java:364) - at org.apache.catalina.connector.CoyoteOutputStream.write(CoyoteOutputStream.java:96) - at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137) - at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1150) - at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:923) - at org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal(AbstractJackson2HttpMessageConverter.java:287) - at org.springframework.http.converter.AbstractGenericHttpMessageConverter.write(AbstractGenericHttpMessageConverter.java:104) - at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:287) - at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor.java:181) - at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82) - at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:123) - at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403) - at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61) - at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:141) - at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:80) - at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1300) - at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1111) - at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057) - at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) - at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) - at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) - at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) - at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) - at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) - at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) - at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) - at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) - at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) - at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) - at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) - at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) - at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) - at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) - at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) - at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) - at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) - at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373) - at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) - at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) - at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1594) - at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) - at java.lang.Thread.run(Thread.java:748) -Caused by: java.io.IOException: Broken pipe - at sun.nio.ch.FileDispatcherImpl.write0(Native Method) - at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) - at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) - at sun.nio.ch.IOUtil.write(IOUtil.java:65) - at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) - at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138) - at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101) - at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152) - at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253) - at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740) - at org.apache.tomcat.util.net.SocketWrapperBase.writeBlocking(SocketWrapperBase.java:560) - at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:504) - at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.doWrite(Http11OutputBuffer.java:538) - at org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java:110) - at org.apache.coyote.http11.Http11OutputBuffer.doWrite(Http11OutputBuffer.java:190) - at org.apache.coyote.Response.doWrite(Response.java:601) - at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:339) - ... 60 common frames omitted - -``` - -## V2.0.0 ->经过小小的调查,mongoDB似乎允许用户直接使用它的文件系统:GridFS,完成文件的存储。那么要不要改成文件对文件的形式呢?同步开始时,先在本地生成日志文件,然后同步到MongoDB。查询时则先下载文件。一旦拥有了完整的文件,分页什么的也就容易实现了,前端展示一次1000行之类的~ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index c05fe17d..90a9fd8c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -193,7 +193,7 @@ public class InstanceLogService { try { instanceId2LastReportTime.remove(instanceId); CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); - log.warn("[InstanceLog-{}] delete local instanceLog successfully.", instanceId); + log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId); }catch (Exception e) { log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } diff --git a/powerjob-worker-agent/src/main/resources/logback.xml b/powerjob-worker-agent/src/main/resources/logback.xml index c8d4ec08..ac501a9a 100644 --- a/powerjob-worker-agent/src/main/resources/logback.xml +++ b/powerjob-worker-agent/src/main/resources/logback.xml @@ -29,7 +29,7 @@ ${LOG_PATH}/powerjob-agent-error.log ${LOG_PATH}/powerjob-agent-error.%d{yyyy-MM-dd}.log - 7 + 3 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -49,7 +49,7 @@ ${LOG_PATH}/powerjob-agent-application.log ${LOG_PATH}/powerjob-agent-application.%d{yyyy-MM-dd}.log - 7 + 3 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java index 9d7e898f..73bb3ec7 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java @@ -36,6 +36,7 @@ public class BroadcastProcessorDemo extends BroadcastProcessor { public ProcessResult process(TaskContext taskContext) throws Exception { System.out.println("===== BroadcastProcessorDemo#process ======"); taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); + Thread.sleep(45 * 1000); return new ProcessResult(true); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index cbb69d8d..4b53ae84 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -65,7 +65,7 @@ public class TaskTrackerActor extends AbstractActor { taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); } - taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); + taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index f56e9a62..5d43a969 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -104,6 +105,14 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); } + @Override + public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { + super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result); + // 更新 LastActiveTime + SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId); + timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime); + } + @Override public InstanceDetail fetchRunningStatus() { InstanceDetail detail = new InstanceDetail(); @@ -144,15 +153,10 @@ public class FrequentTaskTracker extends TaskTracker { // 子任务实例ID Long subInstanceId = triggerTimes.incrementAndGet(); - // 记录时间 - SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); - timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); - subInstanceId2TimeHolder.put(subInstanceId, timeHolder); - - // 执行记录缓存 + // 执行记录缓存(只做展示,因此可以放在前面) SubInstanceInfo subInstanceInfo = new SubInstanceInfo(); subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue(); - subInstanceInfo.startTime = timeHolder.startTime; + subInstanceInfo.startTime = System.currentTimeMillis(); recentSubInstanceInfo.put(subInstanceId, subInstanceInfo); String myAddress = OhMyWorker.getWorkerAddress(); @@ -191,6 +195,11 @@ public class FrequentTaskTracker extends TaskTracker { return; } + // 生成记录信息(必须保证持久化成功才能生成该记录,否则会导致 LAUNCH_FAILED 错误) + SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); + timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); + subInstanceId2TimeHolder.put(subInstanceId, timeHolder); + dispatchTask(newRootTask, myAddress); } @@ -243,9 +252,13 @@ public class FrequentTaskTracker extends TaskTracker { long heartbeatTimeout = nowTS - timeHolder.lastActiveTime; // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 - if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { + if (executeTimeout > instanceTimeoutMS) { + onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator); + continue; + } - onFinished(subInstanceId, false, "TIMEOUT", iterator); + if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { + onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator); continue; } @@ -295,14 +308,11 @@ public class FrequentTaskTracker extends TaskTracker { newLastTask.setAddress(OhMyWorker.getWorkerAddress()); submitTask(Lists.newArrayList(newLastTask)); } - } } - // 舍去一切重试机制,反正超时就失败 - - log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch.stop()); } + log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch); } private void reportStatus() { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index a34401c4..1974e0c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -138,12 +138,13 @@ public abstract class TaskTracker { /** * 更新Task状态 * V1.0.0 -> V1.0.1(e405e283ad7f97b0b4e5d369c7de884c0caf9192) 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 + * @param subInstanceId 子任务实例ID * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间 * @param result task的执行结果,未执行完成时为空 */ - public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { + public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { if (finished.get()) { return; @@ -278,7 +279,7 @@ public abstract class TaskTracker { List unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); if (!CollectionUtils.isEmpty(unfinishedTask)) { log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask); - unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); + unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); } } }