diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java index f490146c..42bdc8fe 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java @@ -13,10 +13,10 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; /** * 任务持久化服务 @@ -36,6 +36,11 @@ public class TaskPersistenceService { private static final long RETRY_INTERVAL_MS = 100; + /** + * 慢查询定义:200ms + */ + private static final long SLOW_QUERY_RT_THRESHOLD = 200; + private TaskDAO taskDAO; public TaskPersistenceService(StoreStrategy strategy) { @@ -54,7 +59,7 @@ public class TaskPersistenceService { public boolean save(TaskDO task) { try { - return execute(() -> taskDAO.save(task)); + return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost)); }catch (Exception e) { log.error("[TaskPersistenceService] save task{} failed.", task, e); } @@ -66,7 +71,7 @@ public class TaskPersistenceService { return true; } try { - return execute(() -> taskDAO.batchSave(tasks)); + return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost)); }catch (Exception e) { log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e); } @@ -80,7 +85,7 @@ public class TaskPersistenceService { try { updateEntity.setLastModifiedTime(System.currentTimeMillis()); SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); + return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] updateTask failed.", e); } @@ -92,7 +97,7 @@ public class TaskPersistenceService { */ public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) { try { - return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result)); + return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] updateTaskStatus failed.", e); } @@ -125,7 +130,7 @@ public class TaskPersistenceService { log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition()); try { - return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); + return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] updateLostTasks failed.", e); } @@ -148,7 +153,7 @@ public class TaskPersistenceService { return Optional.empty(); } return Optional.of(taskDOS.get(0)); - }); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e); } @@ -161,7 +166,7 @@ public class TaskPersistenceService { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); query.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleQuery(query)); + return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); } @@ -178,7 +183,7 @@ public class TaskPersistenceService { query.setAddress(address); query.setQueryCondition(condition); - return execute(() -> taskDAO.simpleQuery(query)); + return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e); } @@ -194,7 +199,7 @@ public class TaskPersistenceService { query.setInstanceId(instanceId); query.setStatus(status.getValue()); query.setLimit(limit); - return execute(() -> taskDAO.simpleQuery(query)); + return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e); } @@ -224,7 +229,7 @@ public class TaskPersistenceService { result.put(TaskStatus.of(status), num); }); return result; - }); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e); } @@ -236,31 +241,13 @@ public class TaskPersistenceService { */ public List getAllTaskResult(Long instanceId, Long subInstanceId) { try { - return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId)); + return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e); } return Lists.newLinkedList(); } - /** - * 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,在(我高端的NVMe)SSD上都效果惊人...别说一般的HDD了...磁盘I/O果然是重要瓶颈...) - */ - public Optional getTaskStatus(Long instanceId, String taskId) { - - try { - SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - query.setQueryContent("status"); - return execute(() -> { - List> rows = taskDAO.simpleQueryPlus(query); - return Optional.of(TaskStatus.of((int) rows.get(0).get("status"))); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e); - } - return Optional.empty(); - } - /** * 根据主键查询 Task */ @@ -273,7 +260,7 @@ public class TaskPersistenceService { return Optional.empty(); } return Optional.of(res.get(0)); - }); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e); } @@ -281,35 +268,11 @@ public class TaskPersistenceService { } - /** - * 批量更新 Task 状态 - */ - public boolean batchUpdateTaskStatus(Long instanceId, List taskIds, TaskStatus status, String result) { - try { - return execute(() -> { - - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setQueryCondition(String.format(" task_id in %s ", CommonUtils.getInStringCondition(taskIds))); - - TaskDO updateEntity = new TaskDO(); - updateEntity.setStatus(status.getValue()); - updateEntity.setResult(result); - return taskDAO.simpleUpdate(query, updateEntity); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskIds={},status={},result={}.", - instanceId, taskIds, status, result, e); - } - return false; - } - - public boolean deleteAllTasks(Long instanceId) { try { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); - return execute(() -> taskDAO.simpleDelete(condition)); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); } @@ -321,26 +284,13 @@ public class TaskPersistenceService { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); condition.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleDelete(condition)); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); } return false; } - public List listAll() { - try { - return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setQueryCondition("1 = 1"); - return taskDAO.simpleQuery(query); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] listAll failed.", e); - } - return Collections.emptyList(); - } - private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); @@ -348,7 +298,15 @@ public class TaskPersistenceService { return condition; } - private static T execute(SupplierPlus executor) throws Exception { - return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); + private static T execute(SupplierPlus executor, Consumer slowQueryLogger) throws Exception { + long s = System.currentTimeMillis(); + try { + return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); + } finally { + long cost = System.currentTimeMillis() - s; + if (cost > SLOW_QUERY_RT_THRESHOLD) { + slowQueryLogger.accept(cost); + } + } } } diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java index 6c90b5ed..ba5e67a5 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java @@ -56,14 +56,6 @@ public class PersistenceServiceTest { Thread.sleep(60000); } - @AfterEach - public void listData() { - System.out.println("============= listData ============="); - List result = taskPersistenceService.listAll(); - System.out.println("size: " + result.size()); - result.forEach(System.out::println); - } - @Test public void testBatchSave(){