[opt] optimize TaskTracker

This commit is contained in:
tjq 2020-06-19 18:37:08 +08:00
parent 8c0a746b57
commit 51816e619f
3 changed files with 12 additions and 17 deletions

View File

@ -38,7 +38,7 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** |
| 系统依赖 | MySQL | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |

View File

@ -39,8 +39,8 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp
| Log blanking | not support | support | not support | **support** |
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
| Alarm monitoring | no | mail | SMS | **Email, providing an interface to allow developers to customize development** |
| System dependence | MySQL | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| DAG workflow | not support | not support | support | **support** |
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| workflow | not support | not support | support | **support** |
# Document
**[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)**

View File

@ -2,15 +2,12 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
@ -105,10 +102,11 @@ public class CommonTaskTracker extends TaskTracker {
rootTask.setLastReportTime(-1L);
rootTask.setSubInstanceId(instanceId);
if (!taskPersistenceService.save(rootTask)) {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
}else {
if (taskPersistenceService.save(rootTask)) {
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
}else {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
throw new OmsException("create root task failed for instance: " + instanceId);
}
}
@ -176,7 +174,7 @@ public class CommonTaskTracker extends TaskTracker {
success = holder.failedNum == 0;
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
break;
// MapReduce Broadcast 任务实例是否完成根据**Last_Task**的执行情况判断
// MapReduce Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
default:
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
@ -267,8 +265,8 @@ public class CommonTaskTracker extends TaskTracker {
taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
instanceId, uncheckTask.getTaskId());
log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
}
});
@ -280,9 +278,6 @@ public class CommonTaskTracker extends TaskTracker {
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
taskPersistenceService.updateLostTasks(disconnectedPTs);
}
// 6.3 超时检查 -> 检查超时的Task
}
@Override
@ -290,7 +285,7 @@ public class CommonTaskTracker extends TaskTracker {
try {
innerRun();
}catch (Exception e) {
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceInfo.getInstanceId(), e);
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
}
}
}