From 51816e619fedcd29528ea8b6a0163df145be20af Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 19 Jun 2020 18:37:08 +0800 Subject: [PATCH] [opt] optimize TaskTracker --- README.md | 2 +- README_enUS.md | 4 ++-- .../core/tracker/task/CommonTaskTracker.java | 23 ++++++++----------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index d8236934..2eaabf71 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架, | 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | | 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | | 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** | -| 系统依赖 | MySQL | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | +| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | | DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | diff --git a/README_enUS.md b/README_enUS.md index 94975add..60acd36e 100644 --- a/README_enUS.md +++ b/README_enUS.md @@ -39,8 +39,8 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp | Log blanking | not support | support | not support | **support** | | Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** | | Alarm monitoring | no | mail | SMS | **Email, providing an interface to allow developers to customize development** | -| System dependence | MySQL | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | -| DAG workflow | not support | not support | support | **support** | +| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | +| workflow | not support | not support | support | **support** | # Document **[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 848bd9e4..78b20322 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -2,15 +2,12 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task; import akka.actor.ActorSelection; import akka.pattern.Patterns; -import com.github.kfcfans.powerjob.common.ExecuteType; -import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.SystemInstanceResult; +import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; -import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; @@ -105,10 +102,11 @@ public class CommonTaskTracker extends TaskTracker { rootTask.setLastReportTime(-1L); rootTask.setSubInstanceId(instanceId); - if (!taskPersistenceService.save(rootTask)) { - log.error("[TaskTracker-{}] create root task failed.", instanceId); - }else { + if (taskPersistenceService.save(rootTask)) { log.info("[TaskTracker-{}] create root task successfully.", instanceId); + }else { + log.error("[TaskTracker-{}] create root task failed.", instanceId); + throw new OmsException("create root task failed for instance: " + instanceId); } } @@ -176,7 +174,7 @@ public class CommonTaskTracker extends TaskTracker { success = holder.failedNum == 0; result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); break; - // MapReduce 和 Broadcast 任务实例是否完成根据**Last_Task**的执行情况判断 + // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断 default: Optional lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId); @@ -267,8 +265,8 @@ public class CommonTaskTracker extends TaskTracker { taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity); - log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from ProcessorTracker.", - instanceId, uncheckTask.getTaskId()); + log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.", + instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName()); } }); @@ -280,9 +278,6 @@ public class CommonTaskTracker extends TaskTracker { log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); taskPersistenceService.updateLostTasks(disconnectedPTs); } - - // 6.3 超时检查 -> 检查超时的Task - } @Override @@ -290,7 +285,7 @@ public class CommonTaskTracker extends TaskTracker { try { innerRun(); }catch (Exception e) { - log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceInfo.getInstanceId(), e); + log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e); } } }