From 90b740e32586b039ba192d5a5b24ff9b37b1abe0 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 21 Feb 2024 23:19:37 +0800 Subject: [PATCH] refactor: task tracker's stat info --- .../samples/processors/SimpleProcessor.java | 7 +++ .../tracker/task/heavy/CommonTaskTracker.java | 19 ++++---- .../task/heavy/FrequentTaskTracker.java | 9 ++-- .../tracker/task/heavy/HeavyTaskTracker.java | 36 ++++----------- .../task/stat/InstanceStatisticsHolder.java | 46 +++++++++++++++++++ 5 files changed, 76 insertions(+), 41 deletions(-) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/SimpleProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/SimpleProcessor.java index b82b9cf8..d659f2bb 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/SimpleProcessor.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/SimpleProcessor.java @@ -1,10 +1,12 @@ package tech.powerjob.samples.processors; +import tech.powerjob.official.processors.impl.ConfigProcessor; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.log.OmsLogger; +import java.util.Map; import java.util.Optional; /** @@ -27,6 +29,11 @@ public class SimpleProcessor implements BasicProcessor { return new ProcessResult(true, "任务成功啦!!!"); } + // 测试配置中心获取数据 + Map dynamicConfig = ConfigProcessor.fetchConfig(); + Object valueA = dynamicConfig.get("keyA"); + logger.info("[Test] dynamicConfig: {}, fetchByKeyA: {}", dynamicConfig, valueA); + return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true, "yeah!"); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 8853f310..aa71ba35 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -18,6 +18,7 @@ import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; +import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder; import tech.powerjob.worker.persistence.TaskDO; import java.util.List; @@ -87,8 +88,8 @@ public class CommonTaskTracker extends HeavyTaskTracker { // 填充详细信息 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail(); - taskDetail.setSucceedTaskNum(holder.succeedNum); - taskDetail.setFailedTaskNum(holder.failedNum); + taskDetail.setSucceedTaskNum(holder.getSucceedNum()); + taskDetail.setFailedTaskNum(holder.getFailedNum()); taskDetail.setTotalTaskNum(holder.getTotalTaskNum()); detail.setTaskDetail(taskDetail); @@ -136,8 +137,8 @@ public class CommonTaskTracker extends HeavyTaskTracker { InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); - long finishedNum = holder.succeedNum + holder.failedNum; - long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; + long finishedNum = holder.getFinishedNum(); + long unfinishedNum = holder.getUnfinishedNum(); log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder); @@ -147,8 +148,8 @@ public class CommonTaskTracker extends HeavyTaskTracker { req.setInstanceId(instanceId); req.setWfInstanceId(instanceInfo.getWfInstanceId()); req.setTotalTaskNum(finishedNum + unfinishedNum); - req.setSucceedTaskNum(holder.succeedNum); - req.setFailedTaskNum(holder.failedNum); + req.setSucceedTaskNum(holder.getSucceedNum()); + req.setFailedTaskNum(holder.getFailedNum()); req.setReportTime(System.currentTimeMillis()); req.setStartTime(createTime); req.setSourceAddress(workerRuntime.getWorkerAddress()); @@ -183,8 +184,8 @@ public class CommonTaskTracker extends HeavyTaskTracker { // MAP 不关心结果,最简单 case MAP: finished.set(true); - success = holder.failedNum == 0; - result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); + success = holder.getFailedNum() == 0; + result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.getSucceedNum(), holder.getFailedNum()); break; // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断 default: @@ -239,7 +240,7 @@ public class CommonTaskTracker extends HeavyTaskTracker { // 6.1 定期检查 -> 重试派发后未确认的任务 long currentMS = System.currentTimeMillis(); - if (holder.workerUnreceivedNum != 0) { + if (holder.getWorkerUnreceivedNum() != 0) { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java index 891ce021..3b1a0bc5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java @@ -22,6 +22,7 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.LRUCache; import tech.powerjob.worker.common.utils.TransportUtils; +import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder; import tech.powerjob.worker.persistence.TaskDO; import java.util.*; @@ -283,8 +284,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker { // 查看执行情况 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); - long finishedNum = holder.succeedNum + holder.failedNum; - long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; + long finishedNum = holder.getFinishedNum(); + long unfinishedNum = holder.getUnfinishedNum(); if (unfinishedNum == 0) { @@ -304,8 +305,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker { continue; // MAP 不关心结果,最简单 case MAP: - String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); - onFinished(subInstanceId, holder.failedNum == 0, result, iterator); + String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.getSucceedNum(), holder.getFailedNum()); + onFinished(subInstanceId, holder.getFailedNum() == 0, result, iterator); continue; // MapReduce 和 BroadCast 需要根据是否有 LAST_TASK 来判断结束与否 default: diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 3d5db645..dc8c49f3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -10,6 +10,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.ServerScheduleJobReq; @@ -19,7 +20,6 @@ import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SegmentLock; -import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -28,6 +28,7 @@ import tech.powerjob.worker.common.utils.WorkflowContextUtils; import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; +import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; @@ -432,12 +433,12 @@ public abstract class HeavyTaskTracker extends TaskTracker { Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId); InstanceStatisticsHolder holder = new InstanceStatisticsHolder(); - holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L); - holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L); - holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L); - holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L); - holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L); - holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L); + holder.setWaitingDispatchNum(status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L)); + holder.setWorkerUnreceivedNum(status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L)); + holder.setReceivedNum(status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L)); + holder.setRunningNum(status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L)); + holder.setFailedNum(status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L)); + holder.setSucceedNum(status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L)); return holder; } @@ -546,27 +547,6 @@ public abstract class HeavyTaskTracker extends TaskTracker { private Long lastReportTime; } - /** - * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 - */ - @Data - protected static class InstanceStatisticsHolder { - // 等待派发状态(仅存在 TaskTracker 数据库中) - protected long waitingDispatchNum; - // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行 - protected long workerUnreceivedNum; - // ProcessorTracker确认接收,存在与线程池队列中,排队执行 - protected long receivedNum; - // ProcessorTracker正在执行 - protected long runningNum; - protected long failedNum; - protected long succeedNum; - - public long getTotalTaskNum() { - return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum; - } - } - /** * 初始化 TaskTracker * diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java new file mode 100644 index 00000000..973cb80e --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java @@ -0,0 +1,46 @@ +package tech.powerjob.worker.core.tracker.task.stat; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 + * + * @author tjq + * @since 2024/2/21 + */ +@Data +public class InstanceStatisticsHolder implements Serializable { + + /** + * 等待派发状态(仅存在 TaskTracker 数据库中) + */ + private long waitingDispatchNum; + /** + * 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行 + */ + private long workerUnreceivedNum; + /** + * ProcessorTracker确认接收,存在与线程池队列中,排队执行 + */ + private long receivedNum; + /** + * ProcessorTracker正在执行 + */ + private long runningNum; + private long failedNum; + private long succeedNum; + + public long getTotalTaskNum() { + return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum; + } + + public long getFinishedNum() { + return succeedNum + failedNum; + } + + public long getUnfinishedNum() { + return getTotalTaskNum() - getFinishedNum(); + } +}