From dda79439cae093dd448dfbded66a2d9bf5a41aae Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 23 Feb 2024 23:07:16 +0800 Subject: [PATCH] feat: [SuperMR] ExternalTaskPersistenceService --- .../tech/powerjob/worker/PowerJobWorker.java | 3 +- .../persistence/DbTaskPersistenceService.java | 334 ++++++++++++++++++ .../persistence/TaskPersistenceService.java | 296 +--------------- .../{ => db}/ConnectionFactory.java | 2 +- .../persistence/{ => db}/SimpleTaskQuery.java | 2 +- .../worker/persistence/{ => db}/TaskDAO.java | 3 +- .../persistence/{ => db}/TaskDAOImpl.java | 4 +- .../fs/ExternalTaskPersistenceService.java | 24 ++ .../worker/persistence/fs/FsService.java | 17 + ...ernalTaskFileSystemPersistenceService.java | 116 ++++++ .../fs/impl/LocalDiskFsService.java | 88 +++++ .../persistence/TaskDAOPerformanceTest.java | 5 +- .../worker/persistence/TaskDAOTest.java | 6 +- .../worker/test/PersistenceServiceTest.java | 3 +- 14 files changed, 613 insertions(+), 290 deletions(-) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/ConnectionFactory.java (98%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/SimpleTaskQuery.java (97%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/TaskDAO.java (93%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/TaskDAOImpl.java (99%) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 404da4ca..053ad4c2 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -26,6 +26,7 @@ import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.utils.WorkerNetUtils; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.extension.processor.ProcessorFactory; +import tech.powerjob.worker.persistence.DbTaskPersistenceService; import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.processor.PowerJobProcessorLoader; import tech.powerjob.worker.processor.ProcessorLoader; @@ -122,7 +123,7 @@ public class PowerJobWorker { workerRuntime.setOmsLogHandler(omsLogHandler); // 初始化存储 - TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy()); + TaskPersistenceService taskPersistenceService = new DbTaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy()); taskPersistenceService.init(); workerRuntime.setTaskPersistenceService(taskPersistenceService); log.info("[PowerJobWorker] local storage initialized successfully."); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java new file mode 100644 index 00000000..c297cb3a --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java @@ -0,0 +1,334 @@ +package tech.powerjob.worker.persistence; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.utils.CollectionUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.common.utils.SupplierPlus; +import tech.powerjob.worker.common.constants.StoreStrategy; +import tech.powerjob.worker.common.constants.TaskConstant; +import tech.powerjob.worker.common.constants.TaskStatus; +import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.db.ConnectionFactory; +import tech.powerjob.worker.persistence.db.SimpleTaskQuery; +import tech.powerjob.worker.persistence.db.TaskDAO; +import tech.powerjob.worker.persistence.db.TaskDAOImpl; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +/** + * desc + * + * @author tjq + * @since 2024/2/23 + */ +@Slf4j +public class DbTaskPersistenceService implements TaskPersistenceService { + + private final StoreStrategy strategy; + + /** + * 默认重试次数 + */ + private static final int RETRY_TIMES = 3; + + private static final long RETRY_INTERVAL_MS = 100; + + /** + * 慢查询定义:200ms + */ + private static final long SLOW_QUERY_RT_THRESHOLD = 200; + + private TaskDAO taskDAO; + + public DbTaskPersistenceService(StoreStrategy strategy) { + this.strategy = strategy; + } + + @Override + public void init() throws Exception { + + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.initDatasource(strategy); + + taskDAO = new TaskDAOImpl(connectionFactory); + taskDAO.initTable(); + } + + @Override + public boolean save(TaskDO task) { + + try { + return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] save task{} failed.", task, e); + } + return false; + } + + @Override + public boolean batchSave(List tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return true; + } + try { + return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e); + } + return false; + } + + /** + * 依靠主键更新 Task(不涉及 result 的,都可以用该方法更新) + */ + @Override + public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) { + try { + updateEntity.setLastModifiedTime(System.currentTimeMillis()); + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateTask failed.", e); + } + return false; + } + + /** + * 更新任务状态 + */ + @Override + public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) { + try { + return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateTaskStatus failed.", e); + } + return false; + } + + /** + * 更新被派发到已经失联的 ProcessorTracker 的任务,重新执行 + * update task_info + * set address = 'N/A', status = 0 + * where address in () and status not in (5,6) and instance_id = 277 + */ + @Override + public boolean updateLostTasks(Long instanceId, List addressList, boolean retry) { + + TaskDO updateEntity = new TaskDO(); + updateEntity.setLastModifiedTime(System.currentTimeMillis()); + if (retry) { + updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); + updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + }else { + updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()); + updateEntity.setResult("maybe worker down"); + } + + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + String queryConditionFormat = "address in %s and status not in (%d, %d)"; + String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue()); + query.setQueryCondition(queryCondition); + log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition()); + + try { + return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateLostTasks failed.", e); + } + return false; + } + + /** + * 获取 MapReduce 或 Broadcast 的最后一个任务 + */ + @Override + public Optional getLastTask(Long instanceId, Long subInstanceId) { + + try { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setSubInstanceId(subInstanceId); + query.setTaskName(TaskConstant.LAST_TASK_NAME); + return execute(() -> { + List taskDOS = taskDAO.simpleQuery(query); + if (CollectionUtils.isEmpty(taskDOS)) { + return Optional.empty(); + } + return Optional.of(taskDOS.get(0)); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e); + } + + return Optional.empty(); + } + + @Override + public List getAllTask(Long instanceId, Long subInstanceId) { + try { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setSubInstanceId(subInstanceId); + return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); + } + return Lists.newArrayList(); + } + + /** + * 获取某个 ProcessorTracker 未完成的任务 + * @param instanceId instanceId + * @param address address + * @return result + */ + @Override + public List getAllUnFinishedTaskByAddress(Long instanceId, String address) { + try { + String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue()); + + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setAddress(address); + query.setQueryCondition(condition); + + return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e); + } + return Lists.newArrayList(); + } + + /** + * 获取指定状态的Task + */ + @Override + public List getTaskByStatus(Long instanceId, TaskStatus status, int limit) { + try { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setStatus(status.getValue()); + query.setLimit(limit); + return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e); + } + return Lists.newArrayList(); + } + + /** + * 获取 TaskTracker 管理的子 task 状态统计信息 + * TaskStatus -> num + */ + @Override + public Map getTaskStatusStatistics(Long instanceId, Long subInstanceId) { + try { + + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setSubInstanceId(subInstanceId); + query.setQueryContent("status, count(*) as num"); + query.setOtherCondition("GROUP BY status"); + + return execute(() -> { + List> dbRES = taskDAO.simpleQueryPlus(query); + Map result = Maps.newHashMap(); + dbRES.forEach(row -> { + // H2 数据库都是大写... + int status = Integer.parseInt(String.valueOf(row.get("status"))); + long num = Long.parseLong(String.valueOf(row.get("num"))); + result.put(TaskStatus.of(status), num); + }); + return result; + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e); + } + return Maps.newHashMap(); + } + + /** + * 查询所有Task执行结果,reduce阶段 或 postProcess阶段 使用 + */ + @Override + public List getAllTaskResult(Long instanceId, Long subInstanceId) { + try { + return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e); + } + return Lists.newLinkedList(); + } + + /** + * 根据主键查询 Task + */ + @Override + public Optional getTask(Long instanceId, String taskId) { + try { + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + return execute(() -> { + List res = taskDAO.simpleQuery(query); + if (CollectionUtils.isEmpty(res)) { + return Optional.empty(); + } + return Optional.of(res.get(0)); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e); + } + return Optional.empty(); + } + + @Override + public boolean deleteAllTasks(Long instanceId) { + try { + SimpleTaskQuery condition = new SimpleTaskQuery(); + condition.setInstanceId(instanceId); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); + } + return false; + } + + @Override + public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) { + try { + SimpleTaskQuery condition = new SimpleTaskQuery(); + condition.setInstanceId(instanceId); + condition.setSubInstanceId(subInstanceId); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); + } + return false; + } + + private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) { + SimpleTaskQuery condition = new SimpleTaskQuery(); + condition.setInstanceId(instanceId); + condition.setTaskId(taskId); + return condition; + } + + private static T execute(SupplierPlus executor, Consumer slowQueryLogger) throws Exception { + long s = System.currentTimeMillis(); + try { + return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); + } finally { + long cost = System.currentTimeMillis() - s; + if (cost > SLOW_QUERY_RT_THRESHOLD) { + slowQueryLogger.accept(cost); + } + } + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java index 42bdc8fe..6e90acac 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java @@ -1,22 +1,12 @@ package tech.powerjob.worker.persistence; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import lombok.extern.slf4j.Slf4j; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.utils.CollectionUtils; -import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.SupplierPlus; -import tech.powerjob.worker.common.constants.StoreStrategy; -import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; /** * 任务持久化服务 @@ -24,289 +14,35 @@ import java.util.function.Consumer; * @author tjq * @since 2020/3/17 */ -@Slf4j -public class TaskPersistenceService { +public interface TaskPersistenceService { - private final StoreStrategy strategy; + void init() throws Exception; - /** - * 默认重试次数 - */ - private static final int RETRY_TIMES = 3; + boolean save(TaskDO task); - private static final long RETRY_INTERVAL_MS = 100; + boolean batchSave(List tasks); - /** - * 慢查询定义:200ms - */ - private static final long SLOW_QUERY_RT_THRESHOLD = 200; + boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity); - private TaskDAO taskDAO; + boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result); - public TaskPersistenceService(StoreStrategy strategy) { - this.strategy = strategy; - } + boolean updateLostTasks(Long instanceId, List addressList, boolean retry); - public void init() throws Exception { + Optional getLastTask(Long instanceId, Long subInstanceId); - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.initDatasource(strategy); + List getAllTask(Long instanceId, Long subInstanceId); - taskDAO = new TaskDAOImpl(connectionFactory); - taskDAO.initTable(); - } + List getAllUnFinishedTaskByAddress(Long instanceId, String address); - public boolean save(TaskDO task) { + List getTaskByStatus(Long instanceId, TaskStatus status, int limit); - try { - return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] save task{} failed.", task, e); - } - return false; - } + Map getTaskStatusStatistics(Long instanceId, Long subInstanceId); - public boolean batchSave(List tasks) { - if (CollectionUtils.isEmpty(tasks)) { - return true; - } - try { - return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e); - } - return false; - } + List getAllTaskResult(Long instanceId, Long subInstanceId); - /** - * 依靠主键更新 Task(不涉及 result 的,都可以用该方法更新) - */ - public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) { - try { - updateEntity.setLastModifiedTime(System.currentTimeMillis()); - SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateTask failed.", e); - } - return false; - } + Optional getTask(Long instanceId, String taskId); - /** - * 更新任务状态 - */ - public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) { - try { - return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateTaskStatus failed.", e); - } - return false; - } + boolean deleteAllTasks(Long instanceId); - /** - * 更新被派发到已经失联的 ProcessorTracker 的任务,重新执行 - * update task_info - * set address = 'N/A', status = 0 - * where address in () and status not in (5,6) and instance_id = 277 - */ - public boolean updateLostTasks(Long instanceId, List addressList, boolean retry) { - - TaskDO updateEntity = new TaskDO(); - updateEntity.setLastModifiedTime(System.currentTimeMillis()); - if (retry) { - updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); - updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); - }else { - updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()); - updateEntity.setResult("maybe worker down"); - } - - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - String queryConditionFormat = "address in %s and status not in (%d, %d)"; - String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue()); - query.setQueryCondition(queryCondition); - log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition()); - - try { - return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateLostTasks failed.", e); - } - return false; - } - - /** - * 获取 MapReduce 或 Broadcast 的最后一个任务 - */ - public Optional getLastTask(Long instanceId, Long subInstanceId) { - - try { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setSubInstanceId(subInstanceId); - query.setTaskName(TaskConstant.LAST_TASK_NAME); - return execute(() -> { - List taskDOS = taskDAO.simpleQuery(query); - if (CollectionUtils.isEmpty(taskDOS)) { - return Optional.empty(); - } - return Optional.of(taskDOS.get(0)); - }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e); - } - - return Optional.empty(); - } - - public List getAllTask(Long instanceId, Long subInstanceId) { - try { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); - } - return Lists.newArrayList(); - } - - // 获取某个 ProcessorTracker 未完成的任务 - public List getAllUnFinishedTaskByAddress(Long instanceId, String address) { - try { - String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue()); - - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setAddress(address); - query.setQueryCondition(condition); - - return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e); - } - return Lists.newArrayList(); - } - - /** - * 获取指定状态的Task - */ - public List getTaskByStatus(Long instanceId, TaskStatus status, int limit) { - try { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setStatus(status.getValue()); - query.setLimit(limit); - return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e); - } - return Lists.newArrayList(); - } - - /** - * 获取 TaskTracker 管理的子 task 状态统计信息 - * TaskStatus -> num - */ - public Map getTaskStatusStatistics(Long instanceId, Long subInstanceId) { - try { - - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setSubInstanceId(subInstanceId); - query.setQueryContent("status, count(*) as num"); - query.setOtherCondition("GROUP BY status"); - - return execute(() -> { - List> dbRES = taskDAO.simpleQueryPlus(query); - Map result = Maps.newHashMap(); - dbRES.forEach(row -> { - // H2 数据库都是大写... - int status = Integer.parseInt(String.valueOf(row.get("status"))); - long num = Long.parseLong(String.valueOf(row.get("num"))); - result.put(TaskStatus.of(status), num); - }); - return result; - }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e); - } - return Maps.newHashMap(); - } - - /** - * 查询所有Task执行结果,reduce阶段 或 postProcess阶段 使用 - */ - public List getAllTaskResult(Long instanceId, Long subInstanceId) { - try { - return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e); - } - return Lists.newLinkedList(); - } - - /** - * 根据主键查询 Task - */ - public Optional getTask(Long instanceId, String taskId) { - try { - SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - return execute(() -> { - List res = taskDAO.simpleQuery(query); - if (CollectionUtils.isEmpty(res)) { - return Optional.empty(); - } - return Optional.of(res.get(0)); - }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e); - } - return Optional.empty(); - } - - - public boolean deleteAllTasks(Long instanceId) { - try { - SimpleTaskQuery condition = new SimpleTaskQuery(); - condition.setInstanceId(instanceId); - return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); - } - return false; - } - - public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) { - try { - SimpleTaskQuery condition = new SimpleTaskQuery(); - condition.setInstanceId(instanceId); - condition.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); - } - return false; - } - - private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) { - SimpleTaskQuery condition = new SimpleTaskQuery(); - condition.setInstanceId(instanceId); - condition.setTaskId(taskId); - return condition; - } - - private static T execute(SupplierPlus executor, Consumer slowQueryLogger) throws Exception { - long s = System.currentTimeMillis(); - try { - return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); - } finally { - long cost = System.currentTimeMillis() - s; - if (cost > SLOW_QUERY_RT_THRESHOLD) { - slowQueryLogger.accept(cost); - } - } - } + boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/ConnectionFactory.java similarity index 98% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/ConnectionFactory.java index bd502361..1e8f95b7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/ConnectionFactory.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.JavaUtils; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SimpleTaskQuery.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java similarity index 97% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SimpleTaskQuery.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java index 0e9aec88..b811fb7b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SimpleTaskQuery.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import lombok.Data; import org.apache.commons.lang3.StringUtils; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAO.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAO.java similarity index 93% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAO.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAO.java index 1edf7e5d..8ca8ff8d 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAO.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAO.java @@ -1,6 +1,7 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.TaskDO; import java.sql.SQLException; import java.util.Collection; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java similarity index 99% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java index 9863831d..3159d64f 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java @@ -1,10 +1,10 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import lombok.AllArgsConstructor; +import tech.powerjob.worker.persistence.TaskDO; import java.sql.*; import java.util.Collection; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java new file mode 100644 index 00000000..cec8dfa0 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java @@ -0,0 +1,24 @@ +package tech.powerjob.worker.persistence.fs; + + +import tech.powerjob.worker.persistence.TaskDO; + +import java.io.Closeable; +import java.util.List; + +/** + * 外部任务持久化服务 + * + * @author tjq + * @since 2024/2/22 + */ +public interface ExternalTaskPersistenceService extends Closeable { + + boolean persistPendingTask(List tasks); + + List readPendingTask(); + + boolean persistFinishedTask(List tasks); + + List readFinishedTask(); +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java new file mode 100644 index 00000000..83af20b1 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java @@ -0,0 +1,17 @@ +package tech.powerjob.worker.persistence.fs; + +import java.io.Closeable; +import java.io.IOException; + +/** + * FileSystemService + * + * @author tjq + * @since 2024/2/22 + */ +public interface FsService extends Closeable { + + void writeLine(String content) throws IOException; + + String readLine() throws IOException; +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java new file mode 100644 index 00000000..997ca658 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java @@ -0,0 +1,116 @@ +package tech.powerjob.worker.persistence.fs.impl; + +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; +import tech.powerjob.worker.persistence.fs.FsService; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * 外部文件存储服务 + * + * @author tjq + * @since 2024/2/22 + */ +@Slf4j +public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService { + private final Long instanceId; + + private final FsService pendingFsService; + + private final FsService resultFsService; + + private static final String PENDING_FILE_NAME = "%d-pending"; + private static final String RESULT_FILE_NAME = "%d-result"; + + public ExternalTaskFileSystemPersistenceService(Long instanceId, boolean needResult) { + this.instanceId = instanceId; + + this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId)); + if (needResult) { + this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId)); + } else { + this.resultFsService = new FsService() { + @Override + public void writeLine(String content) throws IOException { + } + + @Override + public String readLine() throws IOException { + return null; + } + @Override + public void close() { + } + }; + } + } + + @Override + public boolean persistPendingTask(List tasks) { + try { + String content = JsonUtils.toJSONString(tasks); + pendingFsService.writeLine(content); + return true; + } catch (Exception e) { + log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks); + } + return false; + } + + @Override + @SneakyThrows + public List readPendingTask() { + String pendingTaskStr = pendingFsService.readLine(); + TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class); + if (taskDOS != null) { + return Lists.newArrayList(taskDOS); + } + return Collections.emptyList(); + } + + @Override + public boolean persistFinishedTask(List tasks) { + try { + String content = JsonUtils.toJSONString(tasks); + resultFsService.writeLine(content); + return true; + } catch (Exception e) { + log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks); + } + return false; + } + + @Override + @SneakyThrows + public List readFinishedTask() { + String pendingTaskStr = resultFsService.readLine(); + TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class); + if (taskDOS != null) { + return Lists.newArrayList(taskDOS); + } + return Collections.emptyList(); + } + + @Override + public void close() { + CommonUtils.executeIgnoreException(() -> { + if (pendingFsService != null) { + pendingFsService.close(); + } + }); + + CommonUtils.executeIgnoreException(() -> { + if (resultFsService != null) { + resultFsService.close(); + } + }); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java new file mode 100644 index 00000000..b267646f --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java @@ -0,0 +1,88 @@ +package tech.powerjob.worker.persistence.fs.impl; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.common.utils.PowerFileUtils; +import tech.powerjob.worker.persistence.fs.FsService; + +import java.io.*; + +/** + * 本地磁盘 + * + * @author tjq + * @since 2024/2/22 + */ +@Slf4j +public class LocalDiskFsService implements FsService { + + private static final String WORKSPACE_PATH = PowerFileUtils.workspace() + "/fs/" + CommonUtils.genUUID() + "/"; + + private static final String FILE_NAME_PATTERN = "%s.powerjob"; + + private final File file; + private final BufferedWriter bufferedWriter; + + private final BufferedReader bufferedReader; + + @SneakyThrows + public LocalDiskFsService(String keyword) { + String fileName = String.format(FILE_NAME_PATTERN, keyword); + String filePath = WORKSPACE_PATH.concat(fileName); + + this.file = new File(filePath); + FileUtils.createParentDirectories(file); + + // 在使用 BufferedReader 包装 FileReader 的情况下,不需要单独关闭 FileReader。当你调用 BufferedReader 的 close() 方法时,它会负责关闭它所包装的 FileReader。这是因为 BufferedReader.close() 方法内部会调用它所包装的流的 close() 方法,确保所有相关资源都被释放,包括底层的文件句柄 + FileWriter fileWriter = new FileWriter(file); + this.bufferedWriter = new BufferedWriter(fileWriter); + this.bufferedReader = new BufferedReader(new FileReader(file)); + + log.info("[LocalDiskFsService] new LocalDiskFsService successfully, path: {}", filePath); + } + + /** + * 按行写数据,线程不安全,考虑到此处不用太在意性能,直接 synchronized + * @param content 内容 + * @throws IOException 异常 + */ + @Override + public synchronized void writeLine(String content) throws IOException { + bufferedWriter.write(content); + bufferedWriter.newLine(); + bufferedWriter.flush(); + } + + /** + * 按行读数据,线程不安全,考虑到此处不用太在意性能,直接 synchronized + * @return 内容 + * @throws IOException 异常 + */ + @Override + public synchronized String readLine() throws IOException { + return bufferedReader.readLine(); + } + + @Override + public void close() { + + CommonUtils.executeIgnoreException(() -> { + if (bufferedWriter != null) { + bufferedWriter.close(); + } + }); + + CommonUtils.executeIgnoreException(() -> { + if (bufferedReader != null) { + bufferedReader.close(); + } + }); + + CommonUtils.executeIgnoreException(() -> { + boolean delete = file.delete(); + log.info("[LocalDiskFsService] delete file[{}] result: {}", file, delete); + }); + } +} diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java index 39014fc7..8a56102b 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java @@ -2,12 +2,15 @@ package tech.powerjob.worker.persistence; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.util.StopWatch; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.db.ConnectionFactory; +import tech.powerjob.worker.persistence.db.SimpleTaskQuery; +import tech.powerjob.worker.persistence.db.TaskDAO; +import tech.powerjob.worker.persistence.db.TaskDAOImpl; import java.util.List; import java.util.Map; diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java index 7c02082f..88d4bfa1 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java @@ -3,14 +3,16 @@ package tech.powerjob.worker.persistence; import com.google.common.collect.Lists; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.db.ConnectionFactory; +import tech.powerjob.worker.persistence.db.SimpleTaskQuery; +import tech.powerjob.worker.persistence.db.TaskDAO; +import tech.powerjob.worker.persistence.db.TaskDAOImpl; -import java.nio.charset.StandardCharsets; import java.sql.SQLIntegrityConstraintViolationException; import java.util.List; import java.util.Map; diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java index 52357f3e..075c1a5b 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java @@ -3,6 +3,7 @@ package tech.powerjob.worker.test; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.worker.persistence.DbTaskPersistenceService; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskPersistenceService; import com.google.common.collect.Lists; @@ -21,7 +22,7 @@ import static tech.powerjob.worker.core.tracker.task.heavy.CommonTaskTracker.ROO */ public class PersistenceServiceTest { - private static TaskPersistenceService taskPersistenceService = new TaskPersistenceService(StoreStrategy.DISK); + private static final TaskPersistenceService taskPersistenceService = new DbTaskPersistenceService(StoreStrategy.DISK); @BeforeAll public static void initTable() throws Exception {