diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index 92c3a353..bd8d5b3d 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -55,4 +55,11 @@ public class PowerJobDKey { */ public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval"; + /* ******************* 不太可能有人用的参数 ******************* */ + + /** + * 最大活跃任务数量,超出部分 SWAP 到磁盘以提升性能 + */ + public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num"; + } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/MapUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/MapUtils.java new file mode 100644 index 00000000..135d2556 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/MapUtils.java @@ -0,0 +1,49 @@ +package tech.powerjob.common.utils; + +import java.text.NumberFormat; +import java.text.ParseException; +import java.util.Map; + +/** + * MapUtils + * + * @author tjq + * @since 2024/2/24 + */ +public class MapUtils { + + public static long getLongValue(Map map, K key) { + Long longObject = getLong(map, key); + return longObject == null ? 0L : longObject; + } + + public static Long getLong(Map map, K key) { + Number answer = getNumber(map, key); + if (answer == null) { + return null; + } else { + return answer instanceof Long ? (Long)answer : answer.longValue(); + } + } + + public static Number getNumber(Map map, K key) { + if (map != null) { + Object answer = map.get(key); + if (answer != null) { + if (answer instanceof Number) { + return (Number)answer; + } + + if (answer instanceof String) { + try { + String text = (String)answer; + return NumberFormat.getInstance().parse(text); + } catch (ParseException var4) { + } + } + } + } + + return null; + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 8853f310..004c7d4b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -18,6 +18,7 @@ import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; +import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.persistence.TaskDO; import java.util.List; @@ -115,7 +116,7 @@ public class CommonTaskTracker extends HeavyTaskTracker { rootTask.setLastReportTime(-1L); rootTask.setSubInstanceId(instanceId); - if (taskPersistenceService.save(rootTask)) { + if (taskPersistenceService.batchSave(Lists.newArrayList(rootTask))) { log.info("[TaskTracker-{}] create root task successfully.", instanceId); } else { log.error("[TaskTracker-{}] create root task failed.", instanceId); @@ -171,13 +172,13 @@ public class CommonTaskTracker extends HeavyTaskTracker { // STANDALONE 只有一个任务,完成即结束 case STANDALONE: finished.set(true); - List allTask = taskPersistenceService.getAllTask(instanceId, instanceId); - if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { + List allTaskResult = taskPersistenceService.getAllTaskResult(instanceId, instanceId); + if (CollectionUtils.isEmpty(allTaskResult) || allTaskResult.size() > 1) { result = SystemInstanceResult.UNKNOWN_BUG; log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); } else { - result = allTask.get(0).getResult(); - success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); + result = allTaskResult.get(0).getResult(); + success = allTaskResult.get(0).isSuccess(); } break; // MAP 不关心结果,最简单 diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java index 891ce021..9f706963 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java @@ -22,6 +22,7 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.LRUCache; import tech.powerjob.worker.common.utils.TransportUtils; +import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.persistence.TaskDO; import java.util.*; @@ -203,7 +204,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker { } // 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) - if (!taskPersistenceService.save(newRootTask)) { + if (!taskPersistenceService.batchSave(Lists.newArrayList(newRootTask))) { log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId); processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); return; @@ -298,9 +299,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker { // STANDALONE 代表任务确实已经执行完毕了 case STANDALONE: // 查询数据库获取结果(STANDALONE每个SubInstance只会有一条Task记录) - TaskDO resultTask = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0); - boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); - onFinished(subInstanceId, success, resultTask.getResult(), iterator); + TaskResult resultTask = taskPersistenceService.getAllTaskResult(instanceId, subInstanceId).get(0); + onFinished(subInstanceId, resultTask.isSuccess(), resultTask.getResult(), iterator); continue; // MAP 不关心结果,最简单 case MAP: diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 3d5db645..5f42c035 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -57,7 +57,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { /** * 数据库持久化服务 */ - protected final TaskPersistenceService taskPersistenceService; + protected TaskPersistenceService taskPersistenceService; /** * 定时任务线程池 */ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java index c297cb3a..307a4836 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java @@ -16,6 +16,7 @@ import tech.powerjob.worker.persistence.db.SimpleTaskQuery; import tech.powerjob.worker.persistence.db.TaskDAO; import tech.powerjob.worker.persistence.db.TaskDAOImpl; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,17 +61,6 @@ public class DbTaskPersistenceService implements TaskPersistenceService { taskDAO.initTable(); } - @Override - public boolean save(TaskDO task) { - - try { - return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] save task{} failed.", task, e); - } - return false; - } - @Override public boolean batchSave(List tasks) { if (CollectionUtils.isEmpty(tasks)) { @@ -171,19 +161,6 @@ public class DbTaskPersistenceService implements TaskPersistenceService { return Optional.empty(); } - @Override - public List getAllTask(Long instanceId, Long subInstanceId) { - try { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost)); - }catch (Exception e) { - log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); - } - return Lists.newArrayList(); - } - /** * 获取某个 ProcessorTracker 未完成的任务 * @param instanceId instanceId @@ -313,6 +290,19 @@ public class DbTaskPersistenceService implements TaskPersistenceService { return false; } + @Override + public boolean deleteTasksByTaskIds(Long instanceId, Collection taskId) { + try { + SimpleTaskQuery condition = new SimpleTaskQuery(); + condition.setInstanceId(instanceId); + condition.setTaskIds(taskId); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteTasksByTaskIds cost {}ms", instanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] deleteTasksByTaskIds failed, instanceId={}.", instanceId, e); + } + return false; + } + private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java new file mode 100644 index 00000000..4a410162 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java @@ -0,0 +1,320 @@ +package tech.powerjob.worker.persistence; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.PowerJobDKey; +import tech.powerjob.common.enhance.SafeRunnable; +import tech.powerjob.common.utils.CollectionUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.common.utils.MapUtils; +import tech.powerjob.worker.common.constants.TaskStatus; +import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; +import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; + +/** + * SWAP:换入换出降低运行时开销 + * + * @author tjq + * @since 2024/2/23 + */ +@Slf4j +public class SwapTaskPersistenceService implements TaskPersistenceService { + + private final Long instanceId; + private final long maxActiveTaskNum; + /** + * 数据库记录数量,不要求完全精确,仅用于控制存哪里,有一定容忍度 + */ + private final LongAdder dbRecordNum = new LongAdder(); + /** + * 外部存储的任务数量,必须精确,否则会导致任务卡住 + */ + private final LongAdder externalPendingRecordNum = new LongAdder(); + private final LongAdder externalSucceedRecordNum = new LongAdder(); + private final LongAdder externalFailedRecordNum = new LongAdder(); + + private final boolean needResult; + private final TaskPersistenceService dbTaskPersistenceService; + + private boolean swapEnabled; + private ExternalTaskPersistenceService externalTaskPersistenceService; + + private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000; + + /** + * 默认工作频率 + */ + private static final long DEFAULT_SCHEDULE_TIME = 60000; + + public SwapTaskPersistenceService(Long instanceId, boolean needResult, TaskPersistenceService dbTaskPersistenceService) { + this.instanceId = instanceId; + this.needResult = needResult; + this.dbTaskPersistenceService = dbTaskPersistenceService; + this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM))); + } + + @Override + public void init() throws Exception { + } + + @Override + public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) { + return dbTaskPersistenceService.updateTask(instanceId, taskId, updateEntity); + } + + @Override + public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) { + return dbTaskPersistenceService.updateTaskStatus(instanceId, taskId, status, lastReportTime, result); + } + + @Override + public boolean updateLostTasks(Long instanceId, List addressList, boolean retry) { + return dbTaskPersistenceService.updateLostTasks(instanceId, addressList, retry); + } + + @Override + public Optional getLastTask(Long instanceId, Long subInstanceId) { + return dbTaskPersistenceService.getLastTask(instanceId, subInstanceId); + } + + @Override + public List getAllUnFinishedTaskByAddress(Long instanceId, String address) { + return dbTaskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, address); + } + + @Override + public List getTaskByStatus(Long instanceId, TaskStatus status, int limit) { + return dbTaskPersistenceService.getTaskByStatus(instanceId, status, limit); + } + + @Override + public Optional getTask(Long instanceId, String taskId) { + return dbTaskPersistenceService.getTask(instanceId, taskId); + } + + @Override + public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) { + return dbTaskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId); + } + + @Override + public boolean deleteTasksByTaskIds(Long instanceId, Collection taskId) { + return dbTaskPersistenceService.deleteTasksByTaskIds(instanceId, taskId); + } + + /* 重写区 */ + + @Override + public boolean batchSave(List tasks) { + + long dbNum = dbRecordNum.sum(); + + if (dbNum > maxActiveTaskNum) { + + // 上层保证启用 SWAP 的任务,batchSave 的都是等待调度的任务,不会参与真正的运行 + boolean persistPendingTaskRes = getExternalTaskPersistenceService().persistPendingTask(tasks); + + // 仅成功情况累加(按严格模式累加),防止出现任务无法停止的问题。文件系统实际应该比较稳定,此处出错概率不高 + if (persistPendingTaskRes) { + externalPendingRecordNum.add(tasks.size()); + } + + log.info("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum); + return persistPendingTaskRes; + } else { + return persistTask2Db(tasks); + } + } + + @Override + public boolean deleteAllTasks(Long instanceId) { + CommonUtils.executeIgnoreException(() -> { + if (externalTaskPersistenceService != null) { + externalTaskPersistenceService.close(); + } + }); + return dbTaskPersistenceService.deleteAllTasks(instanceId); + } + + @Override + public Map getTaskStatusStatistics(Long instanceId, Long subInstanceId) { + Map taskStatusStatistics = dbTaskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId); + if (!swapEnabled) { + return taskStatusStatistics; + } + + long waitingNum = MapUtils.getLongValue(taskStatusStatistics, TaskStatus.WAITING_DISPATCH) + externalPendingRecordNum.sum(); + long succeedNum = MapUtils.getLongValue(taskStatusStatistics, TaskStatus.WORKER_PROCESS_SUCCESS) + externalSucceedRecordNum.sum(); + long failedNum = MapUtils.getLongValue(taskStatusStatistics, TaskStatus.WORKER_PROCESS_FAILED) + externalFailedRecordNum.sum(); + + taskStatusStatistics.put(TaskStatus.WAITING_DISPATCH, waitingNum); + taskStatusStatistics.put(TaskStatus.WORKER_PROCESS_SUCCESS, succeedNum); + taskStatusStatistics.put(TaskStatus.WORKER_PROCESS_FAILED, failedNum); + + return taskStatusStatistics; + } + + @Override + public List getAllTaskResult(Long instanceId, Long subInstanceId) { + + List dbTaskResult = dbTaskPersistenceService.getAllTaskResult(instanceId, subInstanceId); + if (!swapEnabled) { + return dbTaskResult; + } + + List allTaskResult = Lists.newLinkedList(dbTaskResult); + while (true) { + List externalTask = externalTaskPersistenceService.readFinishedTask(); + if (CollectionUtils.isEmpty(externalTask)) { + break; + } + externalTask.forEach(t -> { + TaskResult taskResult = new TaskResult(); + taskResult.setTaskId(t.getTaskId()); + taskResult.setSuccess(TaskStatus.WORKER_PROCESS_SUCCESS.getValue() == t.getStatus()); + taskResult.setResult(t.getResult()); + + allTaskResult.add(taskResult); + }); + } + + return allTaskResult; + + // TODO: 后续支持 stream 流式 reduce + } + + private class YuGong extends SafeRunnable { + + @Override + protected void run0() { + + CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME); + + moveInPendingTask(); + moveOutFinishedTask(); + } + + private void moveInPendingTask() { + + while (true) { + + // 外部存储无数据,无需扫描 + if (externalPendingRecordNum.sum() <= 0) { + return; + } + + // 到达 DB 最大数量后跳出扫描 + if (dbRecordNum.sum() > maxActiveTaskNum) { + return; + } + + List taskDOS = getExternalTaskPersistenceService().readPendingTask(); + + // 队列空则跳出循环,等待下一次扫描 + if (CollectionUtils.isEmpty(taskDOS)) { + log.debug("[YuGong-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId); + return; + } + + // 一旦读取,无论结果如何都直接减数量,无论后续结果如何 + externalPendingRecordNum.add(-taskDOS.size()); + + boolean persistTask2Db = persistTask2Db(taskDOS); + log.info("[YuGong-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum); + + // 持久化失败的情况,及时跳出本次循环,防止损失扩大,等待下次扫描 + if (!persistTask2Db) { + log.error("[YuGong-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", instanceId, taskDOS); + return; + } + } + } + + private void moveOutFinishedTask() { + + while (true) { + + // 一旦启动 SWAP,需要移出更多的数据来灌入,最多允许 + long maxRemainNum = maxActiveTaskNum / 2; + if (dbRecordNum.sum() <= maxRemainNum) { + return; + } + + List succeedTasks = dbTaskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WORKER_PROCESS_SUCCESS, 100); + if (!CollectionUtils.isEmpty(succeedTasks)) { + moveOutDetailFinishedTask(succeedTasks, true); + // 优先搬运成功数据,100% 已固化(失败任务可能还夜长梦多) + continue; + } + + List failedTask = dbTaskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WORKER_PROCESS_FAILED, 100); + + // 还没有已完成任务产生 or 移完了,先整体 finished 跳出循环,等待下个调度周期 + if (CollectionUtils.isEmpty(failedTask)) { + return; + } + + moveOutDetailFinishedTask(failedTask, false); + } + } + + private void moveOutDetailFinishedTask(List tasks, boolean success) { + + String logKey = String.format("[YuGong-%d] [moveOut%sTask] ", instanceId, success ? "Success" : "Failed"); + + boolean persistFinishedTask2ExternalResult = getExternalTaskPersistenceService().persistFinishedTask(tasks); + + if (!persistFinishedTask2ExternalResult) { + log.warn("{} persistFinishedTask to external failed, skip this stage!", logKey); + } + + LongAdder externalRecord = success ? externalSucceedRecordNum : externalFailedRecordNum; + + // 持久化成功,直接记录数量,无论 DB 是否删除,外部数据已存在,100% 会被并入统计 + int moveOutNum = tasks.size(); + externalRecord.add(moveOutNum); + + List deleteTaskIds = tasks.stream().map(TaskDO::getTaskId).collect(Collectors.toList()); + boolean deleteTasksByTaskIdsResult = dbTaskPersistenceService.deleteTasksByTaskIds(instanceId, deleteTaskIds); + + if (deleteTasksByTaskIdsResult) { + dbRecordNum.add(-moveOutNum); + log.info("{} move task to external successfully(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalRecord, dbRecordNum); + } else { + // DB 删除失败影响不大,reduce 重复而已 + log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalRecord, dbRecordNum, deleteTaskIds); + } + } + } + + private boolean persistTask2Db(List taskDOS) { + dbRecordNum.add(taskDOS.size()); + return dbTaskPersistenceService.batchSave(taskDOS); + } + + private ExternalTaskPersistenceService getExternalTaskPersistenceService() { + if (externalTaskPersistenceService != null) { + return externalTaskPersistenceService; + } + synchronized (this) { + if (externalTaskPersistenceService != null) { + return externalTaskPersistenceService; + } + + // 初始化 SWAP 相关内容 + this.swapEnabled = true; + this.externalTaskPersistenceService = new ExternalTaskFileSystemPersistenceService(instanceId, needResult); + new Thread(new YuGong(), "PJ-YuGong-" + instanceId).start(); + + return externalTaskPersistenceService; + } + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java index 6e90acac..2c53575c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java @@ -4,6 +4,7 @@ package tech.powerjob.worker.persistence; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -18,8 +19,6 @@ public interface TaskPersistenceService { void init() throws Exception; - boolean save(TaskDO task); - boolean batchSave(List tasks); boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity); @@ -30,8 +29,6 @@ public interface TaskPersistenceService { Optional getLastTask(Long instanceId, Long subInstanceId); - List getAllTask(Long instanceId, Long subInstanceId); - List getAllUnFinishedTaskByAddress(Long instanceId, String address); List getTaskByStatus(Long instanceId, TaskStatus status, int limit); @@ -45,4 +42,6 @@ public interface TaskPersistenceService { boolean deleteAllTasks(Long instanceId); boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId); + + boolean deleteTasksByTaskIds(Long instanceId, Collection taskId); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java index b811fb7b..7372e9a3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java @@ -2,6 +2,10 @@ package tech.powerjob.worker.persistence.db; import lombok.Data; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.utils.CollectionUtils; + +import java.util.Collection; +import java.util.stream.Collectors; /** * 简单查询直接类,只支持 select * from task_info where xxx = xxx and xxx = xxx 的查询 @@ -15,6 +19,9 @@ public class SimpleTaskQuery { private static final String LINK = " and "; private String taskId; + + private Collection taskIds; + private Long subInstanceId; private Long instanceId; private String taskName; @@ -36,6 +43,11 @@ public class SimpleTaskQuery { if (!StringUtils.isEmpty(taskId)) { sb.append("task_id = '").append(taskId).append("'").append(LINK); } + if (!CollectionUtils.isEmpty(taskIds)) { + String taskIdsInQuery = taskIds.stream().map(id -> String.format("'%s'", id)).collect(Collectors.joining(", ")); + + sb.append("task_id in (").append(taskIdsInQuery).append(")").append(LINK); + } if (subInstanceId != null) { sb.append("sub_instance_id = ").append(subInstanceId).append(LINK); } diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/db/SimpleTaskQueryTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/db/SimpleTaskQueryTest.java new file mode 100644 index 00000000..ef12a946 --- /dev/null +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/db/SimpleTaskQueryTest.java @@ -0,0 +1,28 @@ +package tech.powerjob.worker.persistence.db; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * SimpleTaskQueryTest + * + * @author tjq + * @since 2024/2/24 + */ +class SimpleTaskQueryTest { + + @Test + void test() { + SimpleTaskQuery simpleTaskQuery = new SimpleTaskQuery(); + simpleTaskQuery.setInstanceId(10086L); + simpleTaskQuery.setTaskIds(Lists.newArrayList("taskId1", "taskId2", "taskId3")); + + String queryCondition = simpleTaskQuery.getQueryCondition(); + System.out.println(queryCondition); + + assertEquals("task_id in ('taskId1', 'taskId2', 'taskId3') and instance_id = 10086", queryCondition); + } + +} \ No newline at end of file