change dao and persistence service design(tao throw exception and service retry)

This commit is contained in:
tjq 2020-03-26 22:54:52 +08:00
parent 63f082c3b3
commit bc2b84efde
8 changed files with 297 additions and 132 deletions

View File

@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* 公共工具类 * 公共工具类
* *
@ -21,7 +22,7 @@ public class CommonUtils {
* @return 函数成功执行后的返回值 * @return 函数成功执行后的返回值
* @throws Exception 执行失败调用方自行处理 * @throws Exception 执行失败调用方自行处理
*/ */
public static <T> T executeWithRetry(Supplier<T> executor, int retryTimes, long intervalMS) throws Exception { public static <T> T executeWithRetry(SupplierPlus<T> executor, int retryTimes, long intervalMS) throws Exception {
if (retryTimes <= 1 || intervalMS <= 0) { if (retryTimes <= 1 || intervalMS <= 0) {
return executor.get(); return executor.get();
} }
@ -35,4 +36,29 @@ public class CommonUtils {
} }
return executor.get(); return executor.get();
} }
/**
* 重试执行仅适用于根据返回值决定是否执行成功的方法
* @param booleanExecutor 需要执行的方法其返回值决定了执行是否成功
* @param retryTimes 重试次数
* @param intervalMS 失败后下一次执行的间隔时间
* @return 最终执行结果
*/
public static boolean executeWithRetryV2(Supplier<Boolean> booleanExecutor, int retryTimes, long intervalMS) {
if (retryTimes <= 1 || intervalMS <= 0) {
return booleanExecutor.get();
}
for (int i = 0; i < retryTimes; i++) {
try {
if (booleanExecutor.get()) {
return true;
}
Thread.sleep(intervalMS);
}catch (Exception ignore) {
}
}
return booleanExecutor.get();
}
} }

View File

@ -0,0 +1,26 @@
package com.github.kfcfans.common.utils;
/**
* Represents a supplier of results.
*
* <p>There is no requirement that a new or distinct result be returned each
* time the supplier is invoked.
*
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #get()}.
*
* @param <T> the type of results supplied by this supplier
*
* @author tjq
* @since 2020/3/26
*/
@FunctionalInterface
public interface SupplierPlus<T> {
/**
* Gets a result.
*
* @return a result
*/
T get() throws Exception;
}

View File

@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -88,36 +89,50 @@ public class TaskTracker {
* 更新任务状态 * 更新任务状态
* 任务状态机只允许数字递增 * 任务状态机只允许数字递增
*/ */
public void updateTaskStatus(String instanceId, String taskId, int status, @Nullable String result, boolean force) { public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result, boolean force) {
// 1. 读取当前Task状态防止过期消息重置任务状态 boolean updateResult;
if (!force) { TaskStatus nTaskStatus = TaskStatus.of(newStatus);
// 1. 强制模式直接执行持久化操作该模式状态只允许变更为非完成状态
if (force) {
updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result);
}else {
TaskStatus originTaskStatus = taskPersistenceService.getTaskStatus(instanceId, taskId); // 2. 读取当前 Task 状态防止逆状态机变更的出现
Optional<TaskStatus> dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId);
if (originTaskStatus == null) { if (!dbTaskStatusOpt.isPresent()) {
log.warn("[TaskTracker] database may overload..."); log.warn("[TaskTracker] get task status failed when try to update new task status, current params is instanceId={},taskId={},newStatus={}.",
instanceId, taskId, newStatus);
}
// 数据库没查到也允许写入这个还需要日后仔细考虑
if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) {
// 必存在但不怎么写Java会警告...
TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH);
log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, taskTracker won't update the status.",
instanceId, taskId, dbTaskStatus, nTaskStatus);
return; return;
} }
if (originTaskStatus.getValue() > status) { // 3. 失败重试处理
log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, this request will be drop.", if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
instanceId, taskId, originTaskStatus, status);
// 数据库查询失败的话就只重试一次
int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(jobInstanceInfo.getTaskRetryNum() - 1);
if (failedCnt < jobInstanceInfo.getTaskRetryNum()) {
boolean retryTask = taskPersistenceService.updateRetryTask(instanceId, taskId, failedCnt + 1);
if (retryTask) {
log.info("[TaskTracker] task(instanceId={},taskId={}) will have a retry.", instanceId, taskId);
return; return;
} }
} }
TaskStatus taskStatus = TaskStatus.of(status);
// 2. 更新数据库状态
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, taskStatus, result);
if (!updateResult) {
try {
Thread.sleep(100);
taskPersistenceService.updateTaskStatus(instanceId, taskId, taskStatus, result);
}catch (Exception ignore) {
} }
// 4. 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢...
updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result);
} }
if (!updateResult) { if (!updateResult) {
log.warn("[TaskTracker] update task status failed, this task(instanceId={}&taskId={}) may be processed repeatedly!", instanceId, taskId); log.warn("[TaskTracker] update task status failed, this task(instanceId={}&taskId={}) may be processed repeatedly!", instanceId, taskId);
} }
@ -272,11 +287,17 @@ public class TaskTracker {
} }
} else { } else {
resultTask = taskPersistenceService.getLastTask(instanceId); Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId);
if (lastTaskOptional.isPresent()) {
// 存在则根据 reduce 任务来判断状态
resultTask = lastTaskOptional.get();
TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED;
}else {
// 不存在代表前置任务刚刚执行完毕需要创建 lastTask // 不存在代表前置任务刚刚执行完毕需要创建 lastTask
if (resultTask == null) {
finishedBoolean = false; finishedBoolean = false;
TaskDO newLastTask = new TaskDO(); TaskDO newLastTask = new TaskDO();
@ -284,10 +305,8 @@ public class TaskTracker {
newLastTask.setTaskId(TaskConstant.LAST_TASK_ID); newLastTask.setTaskId(TaskConstant.LAST_TASK_ID);
newLastTask.setAddress(NetUtils.getLocalHost()); newLastTask.setAddress(NetUtils.getLocalHost());
addTask(Lists.newArrayList(newLastTask)); addTask(Lists.newArrayList(newLastTask));
}else {
TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED;
} }
} }
finished.set(finishedBoolean); finished.set(finishedBoolean);
} }

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.persistence; package com.github.kfcfans.oms.worker.persistence;
import java.sql.SQLException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -20,20 +21,20 @@ public interface TaskDAO {
/** /**
* 插入任务数据 * 插入任务数据
*/ */
boolean save(TaskDO task); boolean save(TaskDO task) throws SQLException;
boolean batchSave(Collection<TaskDO> tasks); boolean batchSave(Collection<TaskDO> tasks) throws SQLException;
int batchDelete(String instanceId, List<String> taskIds); boolean batchDelete(String instanceId, List<String> taskIds) throws SQLException;
List<TaskDO> simpleQuery(SimpleTaskQuery query); List<TaskDO> simpleQuery(SimpleTaskQuery query) throws SQLException;
List<Map<String, Object>> simpleQueryPlus(SimpleTaskQuery query); List<Map<String, Object>> simpleQueryPlus(SimpleTaskQuery query) throws SQLException;
boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField); boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException;
/** /**
* 查询 taskId -> taskResult (为了性能特殊定制主要是内存占用如果使用 simpleQueryPlus内存中需要同时存在3份数据 是同时存在3份数据吗) * 查询 taskId -> taskResult (为了性能特殊定制主要是内存占用如果使用 simpleQueryPlus内存中需要同时存在3份数据 是同时存在3份数据吗)
*/ */
Map<String, String> queryTaskId2TaskResult(String instanceId); Map<String, String> queryTaskId2TaskResult(String instanceId) throws SQLException;
} }

View File

@ -2,7 +2,6 @@ package com.github.kfcfans.oms.worker.persistence;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.sql.*; import java.sql.*;
@ -16,7 +15,6 @@ import java.util.Map;
* @author tjq * @author tjq
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j
public class TaskDAOImpl implements TaskDAO { public class TaskDAOImpl implements TaskDAO {
@Override @Override
@ -32,19 +30,16 @@ public class TaskDAOImpl implements TaskDAO {
} }
@Override @Override
public boolean save(TaskDO task) { public boolean save(TaskDO task) throws SQLException {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
fillInsertPreparedStatement(task, ps); fillInsertPreparedStatement(task, ps);
return ps.executeUpdate() == 1; return ps.executeUpdate() == 1;
}catch (Exception e) {
log.error("[TaskDAO] insert failed.", e);
} }
return false;
} }
@Override @Override
public boolean batchSave(Collection<TaskDO> tasks) { public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
@ -57,29 +52,22 @@ public class TaskDAOImpl implements TaskDAO {
ps.executeBatch(); ps.executeBatch();
return true; return true;
}catch (Exception e) {
log.error("[TaskDAO] insert failed.", e);
} }
return false;
} }
@Override @Override
public int batchDelete(String instanceId, List<String> taskIds) { public boolean batchDelete(String instanceId, List<String> taskIds) throws SQLException {
String deleteSQL = "delete from task_info where instance_id = '%s' and task_id in %s"; String deleteSQL = "delete from task_info where instance_id = '%s' and task_id in %s";
String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds)); String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds));
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.executeUpdate(sql);
return stat.executeUpdate(sql); return true;
}catch (Exception e) {
log.error("[TaskDAO] batchDelete failed(instanceId = {}, taskIds = {}).", instanceId, taskIds, e);
} }
return 0;
} }
@Override @Override
public List<TaskDO> simpleQuery(SimpleTaskQuery query) { public List<TaskDO> simpleQuery(SimpleTaskQuery query) throws SQLException {
ResultSet rs = null; ResultSet rs = null;
String sql = "select * from task_info where " + query.getQueryCondition(); String sql = "select * from task_info where " + query.getQueryCondition();
List<TaskDO> result = Lists.newLinkedList(); List<TaskDO> result = Lists.newLinkedList();
@ -88,9 +76,7 @@ public class TaskDAOImpl implements TaskDAO {
while (rs.next()) { while (rs.next()) {
result.add(convert(rs)); result.add(convert(rs));
} }
}catch (Exception e) { } finally {
log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e);
}finally {
if (rs != null) { if (rs != null) {
try { try {
rs.close(); rs.close();
@ -102,7 +88,7 @@ public class TaskDAOImpl implements TaskDAO {
} }
@Override @Override
public List<Map<String, Object>> simpleQueryPlus(SimpleTaskQuery query) { public List<Map<String, Object>> simpleQueryPlus(SimpleTaskQuery query) throws SQLException {
ResultSet rs = null; ResultSet rs = null;
String sqlFormat = "select %s from task_info where %s"; String sqlFormat = "select %s from task_info where %s";
String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition()); String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition());
@ -121,9 +107,7 @@ public class TaskDAOImpl implements TaskDAO {
row.put(colName, colValue); row.put(colName, colValue);
} }
} }
}catch (Exception e) { } finally {
log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e);
}finally {
if (rs != null) { if (rs != null) {
try { try {
rs.close(); rs.close();
@ -135,20 +119,17 @@ public class TaskDAOImpl implements TaskDAO {
} }
@Override @Override
public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) { public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException {
String sqlFormat = "update task_info set %s where %s"; String sqlFormat = "update task_info set %s where %s";
String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition());
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) {
stat.executeUpdate(); stat.executeUpdate();
return true; return true;
}catch (Exception e) {
log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e);
return false;
} }
} }
@Override @Override
public Map<String, String> queryTaskId2TaskResult(String instanceId) { public Map<String, String> queryTaskId2TaskResult(String instanceId) throws SQLException {
ResultSet rs = null; ResultSet rs = null;
Map<String, String> taskId2Result = Maps.newLinkedHashMapWithExpectedSize(4096); Map<String, String> taskId2Result = Maps.newLinkedHashMapWithExpectedSize(4096);
String sql = "select task_id, result from task_info where instance_id = ?"; String sql = "select task_id, result from task_info where instance_id = ?";
@ -158,8 +139,6 @@ public class TaskDAOImpl implements TaskDAO {
while (rs.next()) { while (rs.next()) {
taskId2Result.put(rs.getString("task_id"), rs.getString("result")); taskId2Result.put(rs.getString("task_id"), rs.getString("result"));
} }
}catch (Exception e) {
log.error("[TaskDAO] queryTaskId2TaskResult failed(sql = {}).", sql, e);
}finally { }finally {
if (rs != null) { if (rs != null) {
try { try {

View File

@ -40,7 +40,9 @@ public class TaskDO {
public String getUpdateSQL() { public String getUpdateSQL() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
if (!StringUtils.isEmpty(address)) {
// address 有置空需求仅判断 NULL
if (address != null) {
sb.append(" address = '").append(address).append("',"); sb.append(" address = '").append(address).append("',");
} }
if (status != null) { if (status != null) {

View File

@ -1,13 +1,19 @@
package com.github.kfcfans.oms.worker.persistence; package com.github.kfcfans.oms.worker.persistence;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.SupplierPlus;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
/** /**
* 任务持久化服务 * 任务持久化服务
@ -15,8 +21,13 @@ import java.util.Map;
* @author tjq * @author tjq
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j
public class TaskPersistenceService { public class TaskPersistenceService {
// 默认重试参数
private static final int RETRY_TIMES = 3;
private static final long RETRY_INTERVAL_MS = 100;
private static volatile boolean initialized = false; private static volatile boolean initialized = false;
public static TaskPersistenceService INSTANCE = new TaskPersistenceService(); public static TaskPersistenceService INSTANCE = new TaskPersistenceService();
@ -24,64 +35,90 @@ public class TaskPersistenceService {
} }
private TaskDAO taskDAO = new TaskDAOImpl(); private TaskDAO taskDAO = new TaskDAOImpl();
private static final int MAX_BATCH_SIZE = 50;
public void init() throws Exception { public void init() throws Exception {
if (initialized) { if (initialized) {
return; return;
} }
taskDAO.initTable(); taskDAO.initTable();
initialized = true;
} }
public boolean save(TaskDO task) { public boolean save(TaskDO task) {
boolean success = taskDAO.save(task);
if (!success) {
try { try {
Thread.sleep(100); return execute(() -> taskDAO.save(task));
success = taskDAO.save(task); }catch (Exception e) {
}catch (Exception ignore) { log.error("[TaskPersistenceService] save task{} failed.", task);
} }
} return false;
return success;
} }
public boolean batchSave(List<TaskDO> tasks) { public boolean batchSave(List<TaskDO> tasks) {
if (CollectionUtils.isEmpty(tasks)) { if (CollectionUtils.isEmpty(tasks)) {
return true; return true;
} }
return taskDAO.batchSave(tasks); try {
return execute(() -> taskDAO.batchSave(tasks));
}catch (Exception e) {
log.error("[TaskPersistenceService] batchSave tasks failed.", e);
}
return false;
} }
/** /**
* 获取 MapReduce Broadcast 的最后一个任务 * 获取 MapReduce Broadcast 的最后一个任务
*/ */
public TaskDO getLastTask(String instanceId) { public Optional<TaskDO> getLastTask(String instanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery(); SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId); query.setInstanceId(instanceId);
query.setTaskName(TaskConstant.LAST_TASK_NAME); query.setTaskName(TaskConstant.LAST_TASK_NAME);
List<TaskDO> taskDOS = taskDAO.simpleQuery(query); List<TaskDO> taskDOS = taskDAO.simpleQuery(query);
if (CollectionUtils.isEmpty(taskDOS)) { if (CollectionUtils.isEmpty(taskDOS)) {
return null; return Optional.empty();
} }
return taskDOS.get(0); return Optional.of(taskDOS.get(0));
});
}catch (Exception e) {
log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e);
}
return Optional.empty();
} }
public List<TaskDO> getAllTask(String instanceId) { public List<TaskDO> getAllTask(String instanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery(); SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId); query.setInstanceId(instanceId);
return taskDAO.simpleQuery(query); return taskDAO.simpleQuery(query);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
}
return Lists.newArrayList();
} }
/** /**
* 获取指定状态的Task * 获取指定状态的Task
*/ */
public List<TaskDO> getTaskByStatus(String instanceId, TaskStatus status, int limit) { public List<TaskDO> getTaskByStatus(String instanceId, TaskStatus status, int limit) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery(); SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId); query.setInstanceId(instanceId);
query.setStatus(status.getValue()); query.setStatus(status.getValue());
query.setLimit(limit); query.setLimit(limit);
return taskDAO.simpleQuery(query); return taskDAO.simpleQuery(query);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e);
}
return Lists.newArrayList();
} }
/** /**
@ -89,6 +126,8 @@ public class TaskPersistenceService {
* TaskStatus -> num * TaskStatus -> num
*/ */
public Map<TaskStatus, Long> getTaskStatusStatistics(String instanceId) { public Map<TaskStatus, Long> getTaskStatusStatistics(String instanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery(); SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId); query.setInstanceId(instanceId);
query.setQueryContent("status, count(*) as num"); query.setQueryContent("status, count(*) as num");
@ -103,55 +142,128 @@ public class TaskPersistenceService {
result.put(TaskStatus.of(status), num); result.put(TaskStatus.of(status), num);
}); });
return result; return result;
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
}
return Maps.newHashMap();
} }
/** /**
* 查询 taskId -> taskResultreduce阶段或postProcess 阶段使用 * 查询 taskId -> taskResultreduce阶段或postProcess 阶段使用
*/ */
public Map<String, String> getTaskId2ResultMap(String instanceId) { public Map<String, String> getTaskId2ResultMap(String instanceId) {
return taskDAO.queryTaskId2TaskResult(instanceId); try {
return execute(() -> taskDAO.queryTaskId2TaskResult(instanceId));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e);
}
return Maps.newHashMap();
} }
/** /**
* 查询任务状态只查询 status节约 I/O 资源 * 查询任务状态只查询 status节约 I/O 资源
*/ */
public TaskStatus getTaskStatus(String instanceId, String taskId) { public Optional<TaskStatus> getTaskStatus(String instanceId, String taskId) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setTaskId(taskId);
query.setQueryContent(" STATUS ");
try {
return execute(() -> {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("STATUS");
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query); List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
return Optional.of(TaskStatus.of((int) rows.get(0).get("STATUS")));
if (CollectionUtils.isEmpty(rows)) { });
return null; }catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e);
}
return Optional.empty();
} }
return TaskStatus.of((int) rows.get(0).get("STATUS")); /**
* 查询任务失败数量只查询 failed_cnt节约 I/O 资源
*/
public Optional<Integer> getTaskFailedCnt(String instanceId, String taskId) {
try {
return execute(() -> {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("failed_cnt");
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
// 查询成功不可能为空
return Optional.of((Integer) rows.get(0).get("FAILED_CNT"));
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskFailedCnt failed, instanceId={},taskId={}.", instanceId, taskId, e);
}
return Optional.empty();
} }
/** /**
* 更新 Task 的状态 * 更新 Task 的状态
*/ */
public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) { public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) {
SimpleTaskQuery condition = new SimpleTaskQuery(); try {
condition.setInstanceId(instanceId); return execute(() -> {
condition.setTaskId(taskId);
TaskDO updateEntity = new TaskDO(); TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue()); updateEntity.setStatus(status.getValue());
updateEntity.setResult(result); updateEntity.setResult(result);
return taskDAO.simpleUpdate(condition, updateEntity); return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskId={},status={},result={}.",
instanceId, taskId, status, result, e);
}
return false;
}
public boolean updateRetryTask(String instanceId, String taskId, int failedCnt) {
try {
return execute(() -> {
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
// 重新选取 worker 节点重试
updateEntity.setAddress("");
updateEntity.setFailedCnt(failedCnt);
return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] updateRetryTask failed, instanceId={},taskId={},failedCnt={}.", instanceId, taskId, failedCnt, e);
}
return false;
} }
public int batchDelete(String instanceId, List<String> taskIds) { public boolean batchDelete(String instanceId, List<String> taskIds) {
return taskDAO.batchDelete(instanceId, taskIds); try {
return execute(() -> taskDAO.batchDelete(instanceId, taskIds));
}catch (Exception e) {
log.error("[TaskPersistenceService] batchDelete failed, instanceId={},taskIds={}.", instanceId, taskIds, e);
}
return false;
} }
public List<TaskDO> listAll() { public List<TaskDO> listAll() {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery(); SimpleTaskQuery query = new SimpleTaskQuery();
query.setQueryCondition("1 = 1"); query.setQueryCondition("1 = 1");
return taskDAO.simpleQuery(query); return taskDAO.simpleQuery(query);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] listAll failed.", e);
}
return Collections.emptyList();
}
private static SimpleTaskQuery genKeyQuery(String instanceId, String taskId) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setTaskId(taskId);
return condition;
}
private static <T> T execute(SupplierPlus<T> executor) throws Exception {
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
} }
} }

View File

@ -61,7 +61,7 @@ public class PersistenceServiceTest {
public void testBatchDelete() { public void testBatchDelete() {
System.out.println("=============== testBatchDelete ==============="); System.out.println("=============== testBatchDelete ===============");
int delete = taskPersistenceService.batchDelete("100860", Lists.newArrayList("0", "1")); boolean delete = taskPersistenceService.batchDelete("100860", Lists.newArrayList("0", "1"));
System.out.println("delete result:" + delete); System.out.println("delete result:" + delete);
} }