diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java index b174fffe..338ca4a6 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.function.Supplier; + /** * 公共工具类 * @@ -21,7 +22,7 @@ public class CommonUtils { * @return 函数成功执行后的返回值 * @throws Exception 执行失败,调用方自行处理 */ - public static T executeWithRetry(Supplier executor, int retryTimes, long intervalMS) throws Exception { + public static T executeWithRetry(SupplierPlus executor, int retryTimes, long intervalMS) throws Exception { if (retryTimes <= 1 || intervalMS <= 0) { return executor.get(); } @@ -35,4 +36,29 @@ public class CommonUtils { } return executor.get(); } + + /** + * 重试执行,仅适用于根据返回值决定是否执行成功的方法 + * @param booleanExecutor 需要执行的方法,其返回值决定了执行是否成功 + * @param retryTimes 重试次数 + * @param intervalMS 失败后下一次执行的间隔时间 + * @return 最终执行结果 + */ + public static boolean executeWithRetryV2(Supplier booleanExecutor, int retryTimes, long intervalMS) { + + if (retryTimes <= 1 || intervalMS <= 0) { + return booleanExecutor.get(); + } + + for (int i = 0; i < retryTimes; i++) { + try { + if (booleanExecutor.get()) { + return true; + } + Thread.sleep(intervalMS); + }catch (Exception ignore) { + } + } + return booleanExecutor.get(); + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/SupplierPlus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/SupplierPlus.java new file mode 100644 index 00000000..f400db85 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/SupplierPlus.java @@ -0,0 +1,26 @@ +package com.github.kfcfans.common.utils; + +/** + * Represents a supplier of results. + * + *

There is no requirement that a new or distinct result be returned each + * time the supplier is invoked. + * + *

This is a functional interface + * whose functional method is {@link #get()}. + * + * @param the type of results supplied by this supplier + * + * @author tjq + * @since 2020/3/26 + */ +@FunctionalInterface +public interface SupplierPlus { + + /** + * Gets a result. + * + * @return a result + */ + T get() throws Exception; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 3feb33b1..d560edf3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -88,36 +89,50 @@ public class TaskTracker { * 更新任务状态 * 任务状态机只允许数字递增 */ - public void updateTaskStatus(String instanceId, String taskId, int status, @Nullable String result, boolean force) { + public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result, boolean force) { - // 1. 读取当前Task状态,防止过期消息重置任务状态 - if (!force) { + boolean updateResult; + TaskStatus nTaskStatus = TaskStatus.of(newStatus); + // 1. 强制模式,直接执行持久化操作(该模式状态只允许变更为非完成状态) + if (force) { + updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result); + }else { - TaskStatus originTaskStatus = taskPersistenceService.getTaskStatus(instanceId, taskId); + // 2. 读取当前 Task 状态,防止逆状态机变更的出现 + Optional dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId); - if (originTaskStatus == null) { - log.warn("[TaskTracker] database may overload..."); + if (!dbTaskStatusOpt.isPresent()) { + log.warn("[TaskTracker] get task status failed when try to update new task status, current params is instanceId={},taskId={},newStatus={}.", + instanceId, taskId, newStatus); + } + + // 数据库没查到,也允许写入(这个还需要日后仔细考虑) + if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) { + // 必存在,但不怎么写,Java会警告... + TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH); + log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, taskTracker won't update the status.", + instanceId, taskId, dbTaskStatus, nTaskStatus); return; } - if (originTaskStatus.getValue() > status) { - log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, this request will be drop.", - instanceId, taskId, originTaskStatus, status); - return; + // 3. 失败重试处理 + if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { + + // 数据库查询失败的话,就只重试一次 + int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(jobInstanceInfo.getTaskRetryNum() - 1); + if (failedCnt < jobInstanceInfo.getTaskRetryNum()) { + boolean retryTask = taskPersistenceService.updateRetryTask(instanceId, taskId, failedCnt + 1); + if (retryTask) { + log.info("[TaskTracker] task(instanceId={},taskId={}) will have a retry.", instanceId, taskId); + return; + } + } } + + // 4. 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...) + updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result); } - TaskStatus taskStatus = TaskStatus.of(status); - - // 2. 更新数据库状态 - boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, taskStatus, result); - if (!updateResult) { - try { - Thread.sleep(100); - taskPersistenceService.updateTaskStatus(instanceId, taskId, taskStatus, result); - }catch (Exception ignore) { - } - } if (!updateResult) { log.warn("[TaskTracker] update task status failed, this task(instanceId={}&taskId={}) may be processed repeatedly!", instanceId, taskId); } @@ -272,11 +287,17 @@ public class TaskTracker { } } else { - resultTask = taskPersistenceService.getLastTask(instanceId); + Optional lastTaskOptional = taskPersistenceService.getLastTask(instanceId); - // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask - if (resultTask == null) { + if (lastTaskOptional.isPresent()) { + // 存在则根据 reduce 任务来判断状态 + resultTask = lastTaskOptional.get(); + TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus()); + finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED; + }else { + + // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask finishedBoolean = false; TaskDO newLastTask = new TaskDO(); @@ -284,10 +305,8 @@ public class TaskTracker { newLastTask.setTaskId(TaskConstant.LAST_TASK_ID); newLastTask.setAddress(NetUtils.getLocalHost()); addTask(Lists.newArrayList(newLastTask)); - }else { - TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus()); - finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED; } + } finished.set(finishedBoolean); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index 1fa22481..4cbf2e78 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker.persistence; +import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -20,20 +21,20 @@ public interface TaskDAO { /** * 插入任务数据 */ - boolean save(TaskDO task); - boolean batchSave(Collection tasks); + boolean save(TaskDO task) throws SQLException; + boolean batchSave(Collection tasks) throws SQLException; - int batchDelete(String instanceId, List taskIds); + boolean batchDelete(String instanceId, List taskIds) throws SQLException; - List simpleQuery(SimpleTaskQuery query); + List simpleQuery(SimpleTaskQuery query) throws SQLException; - List> simpleQueryPlus(SimpleTaskQuery query); + List> simpleQueryPlus(SimpleTaskQuery query) throws SQLException; - boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField); + boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException; /** * 查询 taskId -> taskResult (为了性能特殊定制,主要是内存占用,如果使用 simpleQueryPlus,内存中需要同时存在3份数据 ?是同时存在3份数据吗) */ - Map queryTaskId2TaskResult(String instanceId); + Map queryTaskId2TaskResult(String instanceId) throws SQLException; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index fa081121..20bdd7c3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -2,7 +2,6 @@ package com.github.kfcfans.oms.worker.persistence; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import java.sql.*; @@ -16,7 +15,6 @@ import java.util.Map; * @author tjq * @since 2020/3/17 */ -@Slf4j public class TaskDAOImpl implements TaskDAO { @Override @@ -32,19 +30,16 @@ public class TaskDAOImpl implements TaskDAO { } @Override - public boolean save(TaskDO task) { + public boolean save(TaskDO task) throws SQLException { String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { fillInsertPreparedStatement(task, ps); return ps.executeUpdate() == 1; - }catch (Exception e) { - log.error("[TaskDAO] insert failed.", e); } - return false; } @Override - public boolean batchSave(Collection tasks) { + public boolean batchSave(Collection tasks) throws SQLException { String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { @@ -57,29 +52,22 @@ public class TaskDAOImpl implements TaskDAO { ps.executeBatch(); return true; - }catch (Exception e) { - log.error("[TaskDAO] insert failed.", e); } - return false; } @Override - public int batchDelete(String instanceId, List taskIds) { + public boolean batchDelete(String instanceId, List taskIds) throws SQLException { String deleteSQL = "delete from task_info where instance_id = '%s' and task_id in %s"; String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds)); try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { - - return stat.executeUpdate(sql); - - }catch (Exception e) { - log.error("[TaskDAO] batchDelete failed(instanceId = {}, taskIds = {}).", instanceId, taskIds, e); + stat.executeUpdate(sql); + return true; } - return 0; } @Override - public List simpleQuery(SimpleTaskQuery query) { + public List simpleQuery(SimpleTaskQuery query) throws SQLException { ResultSet rs = null; String sql = "select * from task_info where " + query.getQueryCondition(); List result = Lists.newLinkedList(); @@ -88,9 +76,7 @@ public class TaskDAOImpl implements TaskDAO { while (rs.next()) { result.add(convert(rs)); } - }catch (Exception e) { - log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e); - }finally { + } finally { if (rs != null) { try { rs.close(); @@ -102,7 +88,7 @@ public class TaskDAOImpl implements TaskDAO { } @Override - public List> simpleQueryPlus(SimpleTaskQuery query) { + public List> simpleQueryPlus(SimpleTaskQuery query) throws SQLException { ResultSet rs = null; String sqlFormat = "select %s from task_info where %s"; String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition()); @@ -121,9 +107,7 @@ public class TaskDAOImpl implements TaskDAO { row.put(colName, colValue); } } - }catch (Exception e) { - log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e); - }finally { + } finally { if (rs != null) { try { rs.close(); @@ -135,20 +119,17 @@ public class TaskDAOImpl implements TaskDAO { } @Override - public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) { + public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException { String sqlFormat = "update task_info set %s where %s"; String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { stat.executeUpdate(); return true; - }catch (Exception e) { - log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e); - return false; } } @Override - public Map queryTaskId2TaskResult(String instanceId) { + public Map queryTaskId2TaskResult(String instanceId) throws SQLException { ResultSet rs = null; Map taskId2Result = Maps.newLinkedHashMapWithExpectedSize(4096); String sql = "select task_id, result from task_info where instance_id = ?"; @@ -158,8 +139,6 @@ public class TaskDAOImpl implements TaskDAO { while (rs.next()) { taskId2Result.put(rs.getString("task_id"), rs.getString("result")); } - }catch (Exception e) { - log.error("[TaskDAO] queryTaskId2TaskResult failed(sql = {}).", sql, e); }finally { if (rs != null) { try { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java index ff9ac80e..d370b384 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java @@ -40,7 +40,9 @@ public class TaskDO { public String getUpdateSQL() { StringBuilder sb = new StringBuilder(); - if (!StringUtils.isEmpty(address)) { + + // address 有置空需求,仅判断 NULL + if (address != null) { sb.append(" address = '").append(address).append("',"); } if (status != null) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 9b248501..45527d58 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -1,13 +1,19 @@ package com.github.kfcfans.oms.worker.persistence; +import com.github.kfcfans.common.utils.CommonUtils; +import com.github.kfcfans.common.utils.SupplierPlus; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; /** * 任务持久化服务 @@ -15,8 +21,13 @@ import java.util.Map; * @author tjq * @since 2020/3/17 */ +@Slf4j public class TaskPersistenceService { + // 默认重试参数 + private static final int RETRY_TIMES = 3; + private static final long RETRY_INTERVAL_MS = 100; + private static volatile boolean initialized = false; public static TaskPersistenceService INSTANCE = new TaskPersistenceService(); @@ -24,64 +35,90 @@ public class TaskPersistenceService { } private TaskDAO taskDAO = new TaskDAOImpl(); - private static final int MAX_BATCH_SIZE = 50; public void init() throws Exception { if (initialized) { return; } taskDAO.initTable(); + initialized = true; } public boolean save(TaskDO task) { - boolean success = taskDAO.save(task); - if (!success) { - try { - Thread.sleep(100); - success = taskDAO.save(task); - }catch (Exception ignore) { - } + + try { + return execute(() -> taskDAO.save(task)); + }catch (Exception e) { + log.error("[TaskPersistenceService] save task{} failed.", task); } - return success; + return false; } public boolean batchSave(List tasks) { if (CollectionUtils.isEmpty(tasks)) { return true; } - return taskDAO.batchSave(tasks); + try { + return execute(() -> taskDAO.batchSave(tasks)); + }catch (Exception e) { + log.error("[TaskPersistenceService] batchSave tasks failed.", e); + } + return false; } /** * 获取 MapReduce 或 Broadcast 的最后一个任务 */ - public TaskDO getLastTask(String instanceId) { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setTaskName(TaskConstant.LAST_TASK_NAME); - List taskDOS = taskDAO.simpleQuery(query); - if (CollectionUtils.isEmpty(taskDOS)) { - return null; + public Optional getLastTask(String instanceId) { + + try { + return execute(() -> { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setTaskName(TaskConstant.LAST_TASK_NAME); + List taskDOS = taskDAO.simpleQuery(query); + if (CollectionUtils.isEmpty(taskDOS)) { + return Optional.empty(); + } + return Optional.of(taskDOS.get(0)); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e); } - return taskDOS.get(0); + + return Optional.empty(); } public List getAllTask(String instanceId) { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - return taskDAO.simpleQuery(query); + try { + return execute(() -> { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + return taskDAO.simpleQuery(query); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); + } + return Lists.newArrayList(); } /** * 获取指定状态的Task */ public List getTaskByStatus(String instanceId, TaskStatus status, int limit) { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setStatus(status.getValue()); - query.setLimit(limit); + try { + return execute(() -> { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setStatus(status.getValue()); + query.setLimit(limit); - return taskDAO.simpleQuery(query); + return taskDAO.simpleQuery(query); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e); + } + return Lists.newArrayList(); } /** @@ -89,69 +126,144 @@ public class TaskPersistenceService { * TaskStatus -> num */ public Map getTaskStatusStatistics(String instanceId) { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setQueryContent("status, count(*) as num"); - query.setOtherCondition("GROUP BY status"); - List> dbRES = taskDAO.simpleQueryPlus(query); + try { + return execute(() -> { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setQueryContent("status, count(*) as num"); + query.setOtherCondition("GROUP BY status"); + List> dbRES = taskDAO.simpleQueryPlus(query); - Map result = Maps.newHashMap(); - dbRES.forEach(row -> { - // H2 数据库都是大写... - int status = Integer.parseInt(String.valueOf(row.get("STATUS"))); - long num = Long.parseLong(String.valueOf(row.get("NUM"))); - result.put(TaskStatus.of(status), num); - }); - return result; + Map result = Maps.newHashMap(); + dbRES.forEach(row -> { + // H2 数据库都是大写... + int status = Integer.parseInt(String.valueOf(row.get("STATUS"))); + long num = Long.parseLong(String.valueOf(row.get("NUM"))); + result.put(TaskStatus.of(status), num); + }); + return result; + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e); + } + return Maps.newHashMap(); } /** * 查询 taskId -> taskResult,reduce阶段或postProcess 阶段使用 */ public Map getTaskId2ResultMap(String instanceId) { - return taskDAO.queryTaskId2TaskResult(instanceId); + try { + return execute(() -> taskDAO.queryTaskId2TaskResult(instanceId)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e); + } + return Maps.newHashMap(); } /** * 查询任务状态(只查询 status,节约 I/O 资源) */ - public TaskStatus getTaskStatus(String instanceId, String taskId) { + public Optional getTaskStatus(String instanceId, String taskId) { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setTaskId(taskId); - query.setQueryContent(" STATUS "); - - List> rows = taskDAO.simpleQueryPlus(query); - - if (CollectionUtils.isEmpty(rows)) { - return null; + try { + return execute(() -> { + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + query.setQueryContent("STATUS"); + List> rows = taskDAO.simpleQueryPlus(query); + return Optional.of(TaskStatus.of((int) rows.get(0).get("STATUS"))); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e); } + return Optional.empty(); + } - return TaskStatus.of((int) rows.get(0).get("STATUS")); + /** + * 查询任务失败数量(只查询 failed_cnt,节约 I/O 资源) + */ + public Optional getTaskFailedCnt(String instanceId, String taskId) { + + try { + return execute(() -> { + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + query.setQueryContent("failed_cnt"); + List> rows = taskDAO.simpleQueryPlus(query); + // 查询成功不可能为空 + return Optional.of((Integer) rows.get(0).get("FAILED_CNT")); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskFailedCnt failed, instanceId={},taskId={}.", instanceId, taskId, e); + } + return Optional.empty(); } /** * 更新 Task 的状态 */ public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) { - SimpleTaskQuery condition = new SimpleTaskQuery(); - condition.setInstanceId(instanceId); - condition.setTaskId(taskId); - TaskDO updateEntity = new TaskDO(); - updateEntity.setStatus(status.getValue()); - updateEntity.setResult(result); - return taskDAO.simpleUpdate(condition, updateEntity); + try { + return execute(() -> { + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(status.getValue()); + updateEntity.setResult(result); + return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskId={},status={},result={}.", + instanceId, taskId, status, result, e); + } + return false; + } + + public boolean updateRetryTask(String instanceId, String taskId, int failedCnt) { + + try { + return execute(() -> { + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + // 重新选取 worker 节点重试 + updateEntity.setAddress(""); + updateEntity.setFailedCnt(failedCnt); + return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateRetryTask failed, instanceId={},taskId={},failedCnt={}.", instanceId, taskId, failedCnt, e); + } + return false; } - public int batchDelete(String instanceId, List taskIds) { - return taskDAO.batchDelete(instanceId, taskIds); + public boolean batchDelete(String instanceId, List taskIds) { + try { + return execute(() -> taskDAO.batchDelete(instanceId, taskIds)); + }catch (Exception e) { + log.error("[TaskPersistenceService] batchDelete failed, instanceId={},taskIds={}.", instanceId, taskIds, e); + } + return false; } public List listAll() { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setQueryCondition("1 = 1"); - return taskDAO.simpleQuery(query); + try { + return execute(() -> { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setQueryCondition("1 = 1"); + return taskDAO.simpleQuery(query); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] listAll failed.", e); + } + return Collections.emptyList(); + } + + private static SimpleTaskQuery genKeyQuery(String instanceId, String taskId) { + SimpleTaskQuery condition = new SimpleTaskQuery(); + condition.setInstanceId(instanceId); + condition.setTaskId(taskId); + return condition; + } + + private static T execute(SupplierPlus executor) throws Exception { + return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); } } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java index f7fdf195..310c9708 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java @@ -61,7 +61,7 @@ public class PersistenceServiceTest { public void testBatchDelete() { System.out.println("=============== testBatchDelete ==============="); - int delete = taskPersistenceService.batchDelete("100860", Lists.newArrayList("0", "1")); + boolean delete = taskPersistenceService.batchDelete("100860", Lists.newArrayList("0", "1")); System.out.println("delete result:" + delete); }