refactor: task tracker's stat info

This commit is contained in:
tjq 2024-02-21 23:19:37 +08:00
parent 4793c19af6
commit 90b740e325
5 changed files with 76 additions and 41 deletions

View File

@ -1,10 +1,12 @@
package tech.powerjob.samples.processors; package tech.powerjob.samples.processors;
import tech.powerjob.official.processors.impl.ConfigProcessor;
import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger; import tech.powerjob.worker.log.OmsLogger;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
/** /**
@ -27,6 +29,11 @@ public class SimpleProcessor implements BasicProcessor {
return new ProcessResult(true, "任务成功啦!!!"); return new ProcessResult(true, "任务成功啦!!!");
} }
// 测试配置中心获取数据
Map<String, Object> dynamicConfig = ConfigProcessor.fetchConfig();
Object valueA = dynamicConfig.get("keyA");
logger.info("[Test] dynamicConfig: {}, fetchByKeyA: {}", dynamicConfig, valueA);
return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true, "yeah!"); return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true, "yeah!");
} }

View File

@ -18,6 +18,7 @@ import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder;
import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskDO;
import java.util.List; import java.util.List;
@ -87,8 +88,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// 填充详细信息 // 填充详细信息
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail(); InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
taskDetail.setSucceedTaskNum(holder.succeedNum); taskDetail.setSucceedTaskNum(holder.getSucceedNum());
taskDetail.setFailedTaskNum(holder.failedNum); taskDetail.setFailedTaskNum(holder.getFailedNum());
taskDetail.setTotalTaskNum(holder.getTotalTaskNum()); taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
detail.setTaskDetail(taskDetail); detail.setTaskDetail(taskDetail);
@ -136,8 +137,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
long finishedNum = holder.succeedNum + holder.failedNum; long finishedNum = holder.getFinishedNum();
long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; long unfinishedNum = holder.getUnfinishedNum();
log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder); log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);
@ -147,8 +148,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
req.setInstanceId(instanceId); req.setInstanceId(instanceId);
req.setWfInstanceId(instanceInfo.getWfInstanceId()); req.setWfInstanceId(instanceInfo.getWfInstanceId());
req.setTotalTaskNum(finishedNum + unfinishedNum); req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(holder.succeedNum); req.setSucceedTaskNum(holder.getSucceedNum());
req.setFailedTaskNum(holder.failedNum); req.setFailedTaskNum(holder.getFailedNum());
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
req.setStartTime(createTime); req.setStartTime(createTime);
req.setSourceAddress(workerRuntime.getWorkerAddress()); req.setSourceAddress(workerRuntime.getWorkerAddress());
@ -183,8 +184,8 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// MAP 不关心结果最简单 // MAP 不关心结果最简单
case MAP: case MAP:
finished.set(true); finished.set(true);
success = holder.failedNum == 0; success = holder.getFailedNum() == 0;
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.getSucceedNum(), holder.getFailedNum());
break; break;
// MapReduce Broadcast 任务实例是否完成根据**LastTask**的执行情况判断 // MapReduce Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
default: default:
@ -239,7 +240,7 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// 6.1 定期检查 -> 重试派发后未确认的任务 // 6.1 定期检查 -> 重试派发后未确认的任务
long currentMS = System.currentTimeMillis(); long currentMS = System.currentTimeMillis();
if (holder.workerUnreceivedNum != 0) { if (holder.getWorkerUnreceivedNum() != 0) {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();

View File

@ -22,6 +22,7 @@ import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.LRUCache; import tech.powerjob.worker.common.utils.LRUCache;
import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder;
import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskDO;
import java.util.*; import java.util.*;
@ -283,8 +284,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
// 查看执行情况 // 查看执行情况
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId);
long finishedNum = holder.succeedNum + holder.failedNum; long finishedNum = holder.getFinishedNum();
long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; long unfinishedNum = holder.getUnfinishedNum();
if (unfinishedNum == 0) { if (unfinishedNum == 0) {
@ -304,8 +305,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
continue; continue;
// MAP 不关心结果最简单 // MAP 不关心结果最简单
case MAP: case MAP:
String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.getSucceedNum(), holder.getFailedNum());
onFinished(subInstanceId, holder.failedNum == 0, result, iterator); onFinished(subInstanceId, holder.getFailedNum() == 0, result, iterator);
continue; continue;
// MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否 // MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否
default: default:

View File

@ -10,6 +10,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.ServerScheduleJobReq;
@ -19,7 +20,6 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.constants.TaskStatus;
@ -28,6 +28,7 @@ import tech.powerjob.worker.common.utils.WorkflowContextUtils;
import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder;
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
import tech.powerjob.worker.core.tracker.task.TaskTracker; import tech.powerjob.worker.core.tracker.task.TaskTracker;
import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder;
import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
@ -432,12 +433,12 @@ public abstract class HeavyTaskTracker extends TaskTracker {
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId); Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
InstanceStatisticsHolder holder = new InstanceStatisticsHolder(); InstanceStatisticsHolder holder = new InstanceStatisticsHolder();
holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L); holder.setWaitingDispatchNum(status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L));
holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L); holder.setWorkerUnreceivedNum(status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L));
holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L); holder.setReceivedNum(status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L));
holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L); holder.setRunningNum(status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L));
holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L); holder.setFailedNum(status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L));
holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L); holder.setSucceedNum(status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L));
return holder; return holder;
} }
@ -546,27 +547,6 @@ public abstract class HeavyTaskTracker extends TaskTracker {
private Long lastReportTime; private Long lastReportTime;
} }
/**
* 存储任务实例产生的各个Task状态用于分析任务实例执行情况
*/
@Data
protected static class InstanceStatisticsHolder {
// 等待派发状态仅存在 TaskTracker 数据库中
protected long waitingDispatchNum;
// 已派发 ProcessorTracker 未确认可能由于网络错误请求未送达也有可能 ProcessorTracker 线程池满拒绝执行
protected long workerUnreceivedNum;
// ProcessorTracker确认接收存在与线程池队列中排队执行
protected long receivedNum;
// ProcessorTracker正在执行
protected long runningNum;
protected long failedNum;
protected long succeedNum;
public long getTotalTaskNum() {
return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum;
}
}
/** /**
* 初始化 TaskTracker * 初始化 TaskTracker
* *

View File

@ -0,0 +1,46 @@
package tech.powerjob.worker.core.tracker.task.stat;
import lombok.Data;
import java.io.Serializable;
/**
* 存储任务实例产生的各个Task状态用于分析任务实例执行情况
*
* @author tjq
* @since 2024/2/21
*/
@Data
public class InstanceStatisticsHolder implements Serializable {
/**
* 等待派发状态仅存在 TaskTracker 数据库中
*/
private long waitingDispatchNum;
/**
* 已派发 ProcessorTracker 未确认可能由于网络错误请求未送达也有可能 ProcessorTracker 线程池满拒绝执行
*/
private long workerUnreceivedNum;
/**
* ProcessorTracker确认接收存在与线程池队列中排队执行
*/
private long receivedNum;
/**
* ProcessorTracker正在执行
*/
private long runningNum;
private long failedNum;
private long succeedNum;
public long getTotalTaskNum() {
return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum;
}
public long getFinishedNum() {
return succeedNum + failedNum;
}
public long getUnfinishedNum() {
return getTotalTaskNum() - getFinishedNum();
}
}