Merge branch 'v3.3.0' into jenkins_auto_build

This commit is contained in:
tjq 2020-10-08 17:08:37 +08:00
commit 4773a0cddb
9 changed files with 31 additions and 258 deletions

View File

@ -1,105 +0,0 @@
# 2020.4.8 第一轮测试
## 测试用例
* MapReduce任务http://localhost:7700/job/save?appId=1&concurrency=5&executeType=MAP_REDUCE&groupName=null&instanceRetryNum=3&instanceTimeLimit=4545454545&jobDescription=jobDescription&jobName=testJob&jobParams=%7B%22a%22%3A%22b%22%7D&maxInstanceNum=1&processorInfo=com.github.kfcfans.powerjob.processors.TestMapReduceProcessor&processorType=EMBEDDED_JAVA&status=1&taskRetryNum=3&taskTimeLimit=564465656&timeExpression=0%20*%20*%20*%20*%20%3F%20&timeExpressionType=CRON
## 问题记录
#### 任务执行成功,释放资源失败
第一个任务执行完成后释放资源阶段删除本地H2数据库中所有记录报错堆栈如下
```text
2020-04-08 10:09:19 INFO - [ProcessorTracker-1586311659084] mission complete, ProcessorTracker already destroyed!
2020-04-08 10:09:19 ERROR - [TaskPersistenceService] deleteAllTasks failed, instanceId=1586311659084.
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at CommonUtils.executeWithRetry(CommonUtils.java:34)
at TaskPersistenceService.execute(TaskPersistenceService.java:297)
at TaskPersistenceService.deleteAllTasks(TaskPersistenceService.java:269)
at CommonTaskTracker.destroy(TaskTracker.java:231)
at CommonTaskTracker$StatusCheckRunnable.innerRun(TaskTracker.java:421)
at CommonTaskTracker$StatusCheckRunnable.run(TaskTracker.java:467)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-04-08 10:09:19 WARN - [TaskTracker-1586311659084] delete tasks from database failed.
2020-04-08 10:09:19 INFO - [TaskTracker-1586311659084] TaskTracker has left the world.
```
随后Server派发下来的第二个任务也无法完成创建异常堆栈如下
```text
2020-04-08 10:10:08 ERROR - [TaskPersistenceService] save taskTaskDO{taskId='0', jobId='1', instanceId='1586311804030', taskName='OMS_ROOT_TASK', address='10.37.129.2:2777', status=1, result='null', failedCnt=0, createdTime=1586311808295, lastModifiedTime=1586311808295} failed.
2020-04-08 10:10:08 ERROR - [TaskTracker-1586311804030] create root task failed.
[ERROR] [04/08/2020 10:10:08.511] [oms-akka.actor.internal-dispatcher-20] [akka://oms/user/task_tracker] create root task failed.
java.lang.RuntimeException: create root task failed.
at CommonTaskTracker.persistenceRootTask(TaskTracker.java:208)
at CommonTaskTracker.<init>(TaskTracker.java:81)
at TaskTrackerActor.lambda$onReceiveServerScheduleJobReq$2(TaskTrackerActor.java:138)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at TaskTrackerPool.atomicCreateTaskTracker(TaskTrackerPool.java:30)
at TaskTrackerActor.onReceiveServerScheduleJobReq(TaskTrackerActor.java:138)
```
***
原因及解决方案destroy方法调用了scheduledPool.shutdownNow()方法导致调用该方法的线程池被强制关闭该方法也自然被中断数据删到一半没删掉破坏了数据库结构后面的insert自然也就失败了。
# 2020.4.11 "集群"测试
#### 任务重试机制失效
原因SQL中的now()函数返回的是Datetime不能用ing/bigint去接收...
#### SystemMetric算分问题
问题java.lang.management.OperatingSystemMXBean#getSystemLoadAverage 不一定能获取CPU当前负载可能返回负数代表不可用...
解决方案印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧...
#### 未知的数组越界问题(可能是数据库性能问题)
问题秒级Broadcast任务在第四次执行时当Processor完成执行上报状态时TaskTracker报错错误的本质原因是无法从数据库中找到这个task对应的记录...
场景时间表达式FIX_DELAY对应的TaskTracker为FrequentTaskTracker
异常堆栈
```text
2020-04-16 18:05:09 ERROR - [TaskPersistenceService] getTaskStatus failed, instanceId=1586857062542,taskId=4.
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.LinkedList.checkElementIndex(LinkedList.java:555)
at java.util.LinkedList.get(LinkedList.java:476)
at TaskPersistenceService.lambda$getTaskStatus$10(TaskPersistenceService.java:214)
at CommonUtils.executeWithRetry(CommonUtils.java:37)
at TaskPersistenceService.execute(TaskPersistenceService.java:310)
at TaskPersistenceService.getTaskStatus(TaskPersistenceService.java:212)
at TaskTracker.updateTaskStatus(TaskTracker.java:107)
at TaskTracker.broadcast(TaskTracker.java:214)
at TaskTrackerActor.onReceiveBroadcastTaskPreExecuteFinishedReq(TaskTrackerActor.java:106)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:187)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:186)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:241)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6).
```
解决方案初步怀疑在连续更改时由于数据库锁的存在导致行不可见不知道H2具体的特性。因此需要保证同一个taskId串行更新 -> synchronize Yes
# 2020.4.20 1.0.0发布前测试
#### Server & Worker
* 指定机器执行 -> 验证通过
* Map/MapReduce/Standalone/Broadcast/Shell/Python处理器的执行 -> 验证通过
* 超时失败 -> 验证通过
* 破坏测试:指定错误的处理器 -> 发现问题,会造成死锁(TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T)。通过确保ProcessorTracker一定能创建成功解决如果处理器构建失败之后所有提交的任务直接返回错误。
#### Client
* StopInstance -> success
* FetchInstanceStatus -> success

View File

@ -1,34 +0,0 @@
# 容器测试日志
## ClassNotFound问题
>玩热加载这一套不来几个ClassNotFound都没那味 [滑稽]
测试容器化的MapReduce任务时发现如下错误
```text
2020-05-19 09:33:18 ERROR - [ProcessorRunnable-142925055284740224] execute failed, please fix this bug @tjq!
com.esotericsoftware.kryo.KryoException: Unable to find class: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:182)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:151)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:684)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:795)
at SerializerUtils.deSerialized(SerializerUtils.java:48)
at ProcessorRunnable.innerRun(ProcessorRunnable.java:63)
at ProcessorRunnable.run(ProcessorRunnable.java:179)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:176)
... 12 common frames omitted
```
* 原因分析:经过分析,原有在于序列化与反序列化过程中,框架为了追求性能,采用了**对象池**技术(库存代码: a14f554e0085b6a179375a8ca04665434b73c7bd#SerializerUtils而Kryo在序列化和反序列化过程中只会使用固定的类加载器创建kryo的类对象Kryo.class的类加载器因此无法找到由OMS自定义类加载器创建的容器类。
* 解决方案弃用性能优异的对象池技术该用ThreadLocal + 手动设置Kryo类加载器。

View File

@ -1,100 +0,0 @@
## V1.0.0
#### 持久化链路
1. 客户端使用内存队列异步化批量上报服务器
2. 服务器接收到请求后无脑写H2数据库
3. 任务结束后流式同步到MongoDB持久化存储维护一个包含Array的MongoDB对象
4. 同步结束后删除本地所有数据
#### 查询链路
* 如果本地存在数据,则直接从本地数据库返回
* 如果本地不存在数据则直连MongoDB获取数据再返回
***
问题主要在于前台展示测试100W条数据本地H2占用82MMongoDB未知因为mongo shell不知道为啥用不了...),不过应该也小不到哪里去。这种情况下数据都没办法回传回来...需要更新方案。
```text
org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:351)
at org.apache.catalina.connector.OutputBuffer.flushByteBuffer(OutputBuffer.java:776)
at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:681)
at org.apache.catalina.connector.OutputBuffer.writeBytes(OutputBuffer.java:386)
at org.apache.catalina.connector.OutputBuffer.write(OutputBuffer.java:364)
at org.apache.catalina.connector.CoyoteOutputStream.write(CoyoteOutputStream.java:96)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1150)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:923)
at org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal(AbstractJackson2HttpMessageConverter.java:287)
at org.springframework.http.converter.AbstractGenericHttpMessageConverter.write(AbstractGenericHttpMessageConverter.java:104)
at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:287)
at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor.java:181)
at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:123)
at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403)
at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61)
at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:141)
at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:80)
at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1300)
at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1111)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1594)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138)
at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101)
at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740)
at org.apache.tomcat.util.net.SocketWrapperBase.writeBlocking(SocketWrapperBase.java:560)
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:504)
at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.doWrite(Http11OutputBuffer.java:538)
at org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java:110)
at org.apache.coyote.http11.Http11OutputBuffer.doWrite(Http11OutputBuffer.java:190)
at org.apache.coyote.Response.doWrite(Response.java:601)
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:339)
... 60 common frames omitted
```
## V2.0.0
>经过小小的调查mongoDB似乎允许用户直接使用它的文件系统:GridFS完成文件的存储。那么要不要改成文件对文件的形式呢同步开始时先在本地生成日志文件然后同步到MongoDB。查询时则先下载文件。一旦拥有了完整的文件分页什么的也就容易实现了前端展示一次1000行之类的

View File

@ -193,7 +193,7 @@ public class InstanceLogService {
try {
instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
log.warn("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
}catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
}

View File

@ -29,7 +29,7 @@
<file>${LOG_PATH}/powerjob-agent-error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH}/powerjob-agent-error.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>7</MaxHistory>
<MaxHistory>3</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
@ -49,7 +49,7 @@
<file>${LOG_PATH}/powerjob-agent-application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH}/powerjob-agent-application.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>7</MaxHistory>
<MaxHistory>3</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>

View File

@ -36,6 +36,7 @@ public class BroadcastProcessorDemo extends BroadcastProcessor {
public ProcessResult process(TaskContext taskContext) throws Exception {
System.out.println("===== BroadcastProcessorDemo#process ======");
taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
Thread.sleep(45 * 1000);
return new ProcessResult(true);
}

View File

@ -65,7 +65,7 @@ public class TaskTrackerActor extends AbstractActor {
taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
}
taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
}
/**

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -104,6 +105,14 @@ public class FrequentTaskTracker extends TaskTracker {
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
}
@Override
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result);
// 更新 LastActiveTime
SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId);
timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime);
}
@Override
public InstanceDetail fetchRunningStatus() {
InstanceDetail detail = new InstanceDetail();
@ -144,15 +153,10 @@ public class FrequentTaskTracker extends TaskTracker {
// 子任务实例ID
Long subInstanceId = triggerTimes.incrementAndGet();
// 记录时间
SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder();
timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis();
subInstanceId2TimeHolder.put(subInstanceId, timeHolder);
// 执行记录缓存
// 执行记录缓存只做展示因此可以放在前面
SubInstanceInfo subInstanceInfo = new SubInstanceInfo();
subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue();
subInstanceInfo.startTime = timeHolder.startTime;
subInstanceInfo.startTime = System.currentTimeMillis();
recentSubInstanceInfo.put(subInstanceId, subInstanceInfo);
String myAddress = OhMyWorker.getWorkerAddress();
@ -191,6 +195,11 @@ public class FrequentTaskTracker extends TaskTracker {
return;
}
// 生成记录信息必须保证持久化成功才能生成该记录否则会导致 LAUNCH_FAILED 错误
SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder();
timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis();
subInstanceId2TimeHolder.put(subInstanceId, timeHolder);
dispatchTask(newRootTask, myAddress);
}
@ -243,9 +252,13 @@ public class FrequentTaskTracker extends TaskTracker {
long heartbeatTimeout = nowTS - timeHolder.lastActiveTime;
// 超时包含总运行时间超时和心跳包超时直接判定为失败
if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
if (executeTimeout > instanceTimeoutMS) {
onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator);
continue;
}
onFinished(subInstanceId, false, "TIMEOUT", iterator);
if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator);
continue;
}
@ -295,14 +308,11 @@ public class FrequentTaskTracker extends TaskTracker {
newLastTask.setAddress(OhMyWorker.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask));
}
}
}
// 舍去一切重试机制反正超时就失败
log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch.stop());
}
log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch);
}
private void reportStatus() {

View File

@ -138,12 +138,13 @@ public abstract class TaskTracker {
/**
* 更新Task状态
* V1.0.0 -> V1.0.1e405e283ad7f97b0b4e5d369c7de884c0caf9192 锁方案变更 synchronized (taskId.intern()) 修改为分段锁能大大减少内存占用损失的只有理论并发度而已
* @param subInstanceId 子任务实例ID
* @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态
* @param reportTime 上报时间
* @param result task的执行结果未执行完成时为空
*/
public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
return;
@ -278,7 +279,7 @@ public abstract class TaskTracker {
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
}
}
}