perf: add cost log for TaskPersistenceService

This commit is contained in:
tjq 2024-02-08 14:25:55 +08:00
parent debc2e0abb
commit 6842fb6a7b
2 changed files with 30 additions and 80 deletions

View File

@ -13,10 +13,10 @@ import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
/**
* 任务持久化服务
@ -36,6 +36,11 @@ public class TaskPersistenceService {
private static final long RETRY_INTERVAL_MS = 100;
/**
* 慢查询定义200ms
*/
private static final long SLOW_QUERY_RT_THRESHOLD = 200;
private TaskDAO taskDAO;
public TaskPersistenceService(StoreStrategy strategy) {
@ -54,7 +59,7 @@ public class TaskPersistenceService {
public boolean save(TaskDO task) {
try {
return execute(() -> taskDAO.save(task));
return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] save task{} failed.", task, e);
}
@ -66,7 +71,7 @@ public class TaskPersistenceService {
return true;
}
try {
return execute(() -> taskDAO.batchSave(tasks));
return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
}
@ -80,7 +85,7 @@ public class TaskPersistenceService {
try {
updateEntity.setLastModifiedTime(System.currentTimeMillis());
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTask failed.", e);
}
@ -92,7 +97,7 @@ public class TaskPersistenceService {
*/
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
try {
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result));
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
}
@ -125,7 +130,7 @@ public class TaskPersistenceService {
log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition());
try {
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateLostTasks failed.", e);
}
@ -148,7 +153,7 @@ public class TaskPersistenceService {
return Optional.empty();
}
return Optional.of(taskDOS.get(0));
});
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e);
}
@ -161,7 +166,7 @@ public class TaskPersistenceService {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleQuery(query));
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
}
@ -178,7 +183,7 @@ public class TaskPersistenceService {
query.setAddress(address);
query.setQueryCondition(condition);
return execute(() -> taskDAO.simpleQuery(query));
return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
}
@ -194,7 +199,7 @@ public class TaskPersistenceService {
query.setInstanceId(instanceId);
query.setStatus(status.getValue());
query.setLimit(limit);
return execute(() -> taskDAO.simpleQuery(query));
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e);
}
@ -224,7 +229,7 @@ public class TaskPersistenceService {
result.put(TaskStatus.of(status), num);
});
return result;
});
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
}
@ -236,31 +241,13 @@ public class TaskPersistenceService {
*/
public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
try {
return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId));
return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e);
}
return Lists.newLinkedList();
}
/**
* 查询任务状态只查询 status节约 I/O 资源 -> 测试表明我高端的NVMeSSD上都效果惊人...别说一般的HDD了...磁盘I/O果然是重要瓶颈...
*/
public Optional<TaskStatus> getTaskStatus(Long instanceId, String taskId) {
try {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("status");
return execute(() -> {
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
return Optional.of(TaskStatus.of((int) rows.get(0).get("status")));
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e);
}
return Optional.empty();
}
/**
* 根据主键查询 Task
*/
@ -273,7 +260,7 @@ public class TaskPersistenceService {
return Optional.empty();
}
return Optional.of(res.get(0));
});
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e);
}
@ -281,35 +268,11 @@ public class TaskPersistenceService {
}
/**
* 批量更新 Task 状态
*/
public boolean batchUpdateTaskStatus(Long instanceId, List<String> taskIds, TaskStatus status, String result) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setQueryCondition(String.format(" task_id in %s ", CommonUtils.getInStringCondition(taskIds)));
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue());
updateEntity.setResult(result);
return taskDAO.simpleUpdate(query, updateEntity);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskIds={},status={},result={}.",
instanceId, taskIds, status, result, e);
}
return false;
}
public boolean deleteAllTasks(Long instanceId) {
try {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
return execute(() -> taskDAO.simpleDelete(condition));
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
}
@ -321,26 +284,13 @@ public class TaskPersistenceService {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleDelete(condition));
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
}
return false;
}
public List<TaskDO> listAll() {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setQueryCondition("1 = 1");
return taskDAO.simpleQuery(query);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] listAll failed.", e);
}
return Collections.emptyList();
}
private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
@ -348,7 +298,15 @@ public class TaskPersistenceService {
return condition;
}
private static <T> T execute(SupplierPlus<T> executor) throws Exception {
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
private static <T> T execute(SupplierPlus<T> executor, Consumer<Long> slowQueryLogger) throws Exception {
long s = System.currentTimeMillis();
try {
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
} finally {
long cost = System.currentTimeMillis() - s;
if (cost > SLOW_QUERY_RT_THRESHOLD) {
slowQueryLogger.accept(cost);
}
}
}
}

View File

@ -56,14 +56,6 @@ public class PersistenceServiceTest {
Thread.sleep(60000);
}
@AfterEach
public void listData() {
System.out.println("============= listData =============");
List<TaskDO> result = taskPersistenceService.listAll();
System.out.println("size: " + result.size());
result.forEach(System.out::println);
}
@Test
public void testBatchSave(){