mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: [SuperMR] ExternalTaskPersistenceService
This commit is contained in:
parent
4793c19af6
commit
dda79439ca
@ -26,6 +26,7 @@ import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.common.utils.WorkerNetUtils;
|
||||
import tech.powerjob.worker.core.executor.ExecutorManager;
|
||||
import tech.powerjob.worker.extension.processor.ProcessorFactory;
|
||||
import tech.powerjob.worker.persistence.DbTaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.processor.PowerJobProcessorLoader;
|
||||
import tech.powerjob.worker.processor.ProcessorLoader;
|
||||
@ -122,7 +123,7 @@ public class PowerJobWorker {
|
||||
workerRuntime.setOmsLogHandler(omsLogHandler);
|
||||
|
||||
// 初始化存储
|
||||
TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
|
||||
TaskPersistenceService taskPersistenceService = new DbTaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
|
||||
taskPersistenceService.init();
|
||||
workerRuntime.setTaskPersistenceService(taskPersistenceService);
|
||||
log.info("[PowerJobWorker] local storage initialized successfully.");
|
||||
|
@ -0,0 +1,334 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.common.utils.CollectionUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.common.utils.SupplierPlus;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskConstant;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.db.ConnectionFactory;
|
||||
import tech.powerjob.worker.persistence.db.SimpleTaskQuery;
|
||||
import tech.powerjob.worker.persistence.db.TaskDAO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDAOImpl;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* desc
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/23
|
||||
*/
|
||||
@Slf4j
|
||||
public class DbTaskPersistenceService implements TaskPersistenceService {
|
||||
|
||||
private final StoreStrategy strategy;
|
||||
|
||||
/**
|
||||
* 默认重试次数
|
||||
*/
|
||||
private static final int RETRY_TIMES = 3;
|
||||
|
||||
private static final long RETRY_INTERVAL_MS = 100;
|
||||
|
||||
/**
|
||||
* 慢查询定义:200ms
|
||||
*/
|
||||
private static final long SLOW_QUERY_RT_THRESHOLD = 200;
|
||||
|
||||
private TaskDAO taskDAO;
|
||||
|
||||
public DbTaskPersistenceService(StoreStrategy strategy) {
|
||||
this.strategy = strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() throws Exception {
|
||||
|
||||
ConnectionFactory connectionFactory = new ConnectionFactory();
|
||||
connectionFactory.initDatasource(strategy);
|
||||
|
||||
taskDAO = new TaskDAOImpl(connectionFactory);
|
||||
taskDAO.initTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean save(TaskDO task) {
|
||||
|
||||
try {
|
||||
return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] save task{} failed.", task, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean batchSave(List<TaskDO> tasks) {
|
||||
if (CollectionUtils.isEmpty(tasks)) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 依靠主键更新 Task(不涉及 result 的,都可以用该方法更新)
|
||||
*/
|
||||
@Override
|
||||
public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) {
|
||||
try {
|
||||
updateEntity.setLastModifiedTime(System.currentTimeMillis());
|
||||
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
|
||||
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateTask failed.", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务状态
|
||||
*/
|
||||
@Override
|
||||
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
|
||||
try {
|
||||
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新被派发到已经失联的 ProcessorTracker 的任务,重新执行
|
||||
* update task_info
|
||||
* set address = 'N/A', status = 0
|
||||
* where address in () and status not in (5,6) and instance_id = 277
|
||||
*/
|
||||
@Override
|
||||
public boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry) {
|
||||
|
||||
TaskDO updateEntity = new TaskDO();
|
||||
updateEntity.setLastModifiedTime(System.currentTimeMillis());
|
||||
if (retry) {
|
||||
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
|
||||
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
}else {
|
||||
updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||
updateEntity.setResult("maybe worker down");
|
||||
}
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
String queryConditionFormat = "address in %s and status not in (%d, %d)";
|
||||
String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue());
|
||||
query.setQueryCondition(queryCondition);
|
||||
log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition());
|
||||
|
||||
try {
|
||||
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateLostTasks failed.", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 MapReduce 或 Broadcast 的最后一个任务
|
||||
*/
|
||||
@Override
|
||||
public Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId) {
|
||||
|
||||
try {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setSubInstanceId(subInstanceId);
|
||||
query.setTaskName(TaskConstant.LAST_TASK_NAME);
|
||||
return execute(() -> {
|
||||
List<TaskDO> taskDOS = taskDAO.simpleQuery(query);
|
||||
if (CollectionUtils.isEmpty(taskDOS)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(taskDOS.get(0));
|
||||
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TaskDO> getAllTask(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setSubInstanceId(subInstanceId);
|
||||
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取某个 ProcessorTracker 未完成的任务
|
||||
* @param instanceId instanceId
|
||||
* @param address address
|
||||
* @return result
|
||||
*/
|
||||
@Override
|
||||
public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
|
||||
try {
|
||||
String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setAddress(address);
|
||||
query.setQueryCondition(condition);
|
||||
|
||||
return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定状态的Task
|
||||
*/
|
||||
@Override
|
||||
public List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit) {
|
||||
try {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setStatus(status.getValue());
|
||||
query.setLimit(limit);
|
||||
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 TaskTracker 管理的子 task 状态统计信息
|
||||
* TaskStatus -> num
|
||||
*/
|
||||
@Override
|
||||
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setSubInstanceId(subInstanceId);
|
||||
query.setQueryContent("status, count(*) as num");
|
||||
query.setOtherCondition("GROUP BY status");
|
||||
|
||||
return execute(() -> {
|
||||
List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
|
||||
Map<TaskStatus, Long> result = Maps.newHashMap();
|
||||
dbRES.forEach(row -> {
|
||||
// H2 数据库都是大写...
|
||||
int status = Integer.parseInt(String.valueOf(row.get("status")));
|
||||
long num = Long.parseLong(String.valueOf(row.get("num")));
|
||||
result.put(TaskStatus.of(status), num);
|
||||
});
|
||||
return result;
|
||||
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询所有Task执行结果,reduce阶段 或 postProcess阶段 使用
|
||||
*/
|
||||
@Override
|
||||
public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newLinkedList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主键查询 Task
|
||||
*/
|
||||
@Override
|
||||
public Optional<TaskDO> getTask(Long instanceId, String taskId) {
|
||||
try {
|
||||
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
|
||||
return execute(() -> {
|
||||
List<TaskDO> res = taskDAO.simpleQuery(query);
|
||||
if (CollectionUtils.isEmpty(res)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(res.get(0));
|
||||
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteAllTasks(Long instanceId) {
|
||||
try {
|
||||
SimpleTaskQuery condition = new SimpleTaskQuery();
|
||||
condition.setInstanceId(instanceId);
|
||||
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
SimpleTaskQuery condition = new SimpleTaskQuery();
|
||||
condition.setInstanceId(instanceId);
|
||||
condition.setSubInstanceId(subInstanceId);
|
||||
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) {
|
||||
SimpleTaskQuery condition = new SimpleTaskQuery();
|
||||
condition.setInstanceId(instanceId);
|
||||
condition.setTaskId(taskId);
|
||||
return condition;
|
||||
}
|
||||
|
||||
private static <T> T execute(SupplierPlus<T> executor, Consumer<Long> slowQueryLogger) throws Exception {
|
||||
long s = System.currentTimeMillis();
|
||||
try {
|
||||
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
|
||||
} finally {
|
||||
long cost = System.currentTimeMillis() - s;
|
||||
if (cost > SLOW_QUERY_RT_THRESHOLD) {
|
||||
slowQueryLogger.accept(cost);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,22 +1,12 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.common.utils.CollectionUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.common.utils.SupplierPlus;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskConstant;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 任务持久化服务
|
||||
@ -24,289 +14,35 @@ import java.util.function.Consumer;
|
||||
* @author tjq
|
||||
* @since 2020/3/17
|
||||
*/
|
||||
@Slf4j
|
||||
public class TaskPersistenceService {
|
||||
public interface TaskPersistenceService {
|
||||
|
||||
private final StoreStrategy strategy;
|
||||
void init() throws Exception;
|
||||
|
||||
/**
|
||||
* 默认重试次数
|
||||
*/
|
||||
private static final int RETRY_TIMES = 3;
|
||||
boolean save(TaskDO task);
|
||||
|
||||
private static final long RETRY_INTERVAL_MS = 100;
|
||||
boolean batchSave(List<TaskDO> tasks);
|
||||
|
||||
/**
|
||||
* 慢查询定义:200ms
|
||||
*/
|
||||
private static final long SLOW_QUERY_RT_THRESHOLD = 200;
|
||||
boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity);
|
||||
|
||||
private TaskDAO taskDAO;
|
||||
boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result);
|
||||
|
||||
public TaskPersistenceService(StoreStrategy strategy) {
|
||||
this.strategy = strategy;
|
||||
}
|
||||
boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry);
|
||||
|
||||
public void init() throws Exception {
|
||||
Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId);
|
||||
|
||||
ConnectionFactory connectionFactory = new ConnectionFactory();
|
||||
connectionFactory.initDatasource(strategy);
|
||||
List<TaskDO> getAllTask(Long instanceId, Long subInstanceId);
|
||||
|
||||
taskDAO = new TaskDAOImpl(connectionFactory);
|
||||
taskDAO.initTable();
|
||||
}
|
||||
List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address);
|
||||
|
||||
public boolean save(TaskDO task) {
|
||||
List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit);
|
||||
|
||||
try {
|
||||
return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] save task{} failed.", task, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId);
|
||||
|
||||
public boolean batchSave(List<TaskDO> tasks) {
|
||||
if (CollectionUtils.isEmpty(tasks)) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId);
|
||||
|
||||
/**
|
||||
* 依靠主键更新 Task(不涉及 result 的,都可以用该方法更新)
|
||||
*/
|
||||
public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) {
|
||||
try {
|
||||
updateEntity.setLastModifiedTime(System.currentTimeMillis());
|
||||
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
|
||||
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateTask failed.", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
Optional<TaskDO> getTask(Long instanceId, String taskId);
|
||||
|
||||
/**
|
||||
* 更新任务状态
|
||||
*/
|
||||
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
|
||||
try {
|
||||
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
boolean deleteAllTasks(Long instanceId);
|
||||
|
||||
/**
|
||||
* 更新被派发到已经失联的 ProcessorTracker 的任务,重新执行
|
||||
* update task_info
|
||||
* set address = 'N/A', status = 0
|
||||
* where address in () and status not in (5,6) and instance_id = 277
|
||||
*/
|
||||
public boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry) {
|
||||
|
||||
TaskDO updateEntity = new TaskDO();
|
||||
updateEntity.setLastModifiedTime(System.currentTimeMillis());
|
||||
if (retry) {
|
||||
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
|
||||
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
}else {
|
||||
updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||
updateEntity.setResult("maybe worker down");
|
||||
}
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
String queryConditionFormat = "address in %s and status not in (%d, %d)";
|
||||
String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue());
|
||||
query.setQueryCondition(queryCondition);
|
||||
log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition());
|
||||
|
||||
try {
|
||||
return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateLostTasks failed.", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 MapReduce 或 Broadcast 的最后一个任务
|
||||
*/
|
||||
public Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId) {
|
||||
|
||||
try {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setSubInstanceId(subInstanceId);
|
||||
query.setTaskName(TaskConstant.LAST_TASK_NAME);
|
||||
return execute(() -> {
|
||||
List<TaskDO> taskDOS = taskDAO.simpleQuery(query);
|
||||
if (CollectionUtils.isEmpty(taskDOS)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(taskDOS.get(0));
|
||||
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public List<TaskDO> getAllTask(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setSubInstanceId(subInstanceId);
|
||||
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
// 获取某个 ProcessorTracker 未完成的任务
|
||||
public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
|
||||
try {
|
||||
String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setAddress(address);
|
||||
query.setQueryCondition(condition);
|
||||
|
||||
return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定状态的Task
|
||||
*/
|
||||
public List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit) {
|
||||
try {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setStatus(status.getValue());
|
||||
query.setLimit(limit);
|
||||
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 TaskTracker 管理的子 task 状态统计信息
|
||||
* TaskStatus -> num
|
||||
*/
|
||||
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setSubInstanceId(subInstanceId);
|
||||
query.setQueryContent("status, count(*) as num");
|
||||
query.setOtherCondition("GROUP BY status");
|
||||
|
||||
return execute(() -> {
|
||||
List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
|
||||
Map<TaskStatus, Long> result = Maps.newHashMap();
|
||||
dbRES.forEach(row -> {
|
||||
// H2 数据库都是大写...
|
||||
int status = Integer.parseInt(String.valueOf(row.get("status")));
|
||||
long num = Long.parseLong(String.valueOf(row.get("num")));
|
||||
result.put(TaskStatus.of(status), num);
|
||||
});
|
||||
return result;
|
||||
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询所有Task执行结果,reduce阶段 或 postProcess阶段 使用
|
||||
*/
|
||||
public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newLinkedList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主键查询 Task
|
||||
*/
|
||||
public Optional<TaskDO> getTask(Long instanceId, String taskId) {
|
||||
try {
|
||||
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
|
||||
return execute(() -> {
|
||||
List<TaskDO> res = taskDAO.simpleQuery(query);
|
||||
if (CollectionUtils.isEmpty(res)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(res.get(0));
|
||||
}, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
|
||||
public boolean deleteAllTasks(Long instanceId) {
|
||||
try {
|
||||
SimpleTaskQuery condition = new SimpleTaskQuery();
|
||||
condition.setInstanceId(instanceId);
|
||||
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) {
|
||||
try {
|
||||
SimpleTaskQuery condition = new SimpleTaskQuery();
|
||||
condition.setInstanceId(instanceId);
|
||||
condition.setSubInstanceId(subInstanceId);
|
||||
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) {
|
||||
SimpleTaskQuery condition = new SimpleTaskQuery();
|
||||
condition.setInstanceId(instanceId);
|
||||
condition.setTaskId(taskId);
|
||||
return condition;
|
||||
}
|
||||
|
||||
private static <T> T execute(SupplierPlus<T> executor, Consumer<Long> slowQueryLogger) throws Exception {
|
||||
long s = System.currentTimeMillis();
|
||||
try {
|
||||
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
|
||||
} finally {
|
||||
long cost = System.currentTimeMillis() - s;
|
||||
if (cost > SLOW_QUERY_RT_THRESHOLD) {
|
||||
slowQueryLogger.accept(cost);
|
||||
}
|
||||
}
|
||||
}
|
||||
boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId);
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.common.utils.JavaUtils;
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collection;
|
@ -1,10 +1,10 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.AllArgsConstructor;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.Collection;
|
@ -0,0 +1,24 @@
|
||||
package tech.powerjob.worker.persistence.fs;
|
||||
|
||||
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 外部任务持久化服务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
public interface ExternalTaskPersistenceService extends Closeable {
|
||||
|
||||
boolean persistPendingTask(List<TaskDO> tasks);
|
||||
|
||||
List<TaskDO> readPendingTask();
|
||||
|
||||
boolean persistFinishedTask(List<TaskDO> tasks);
|
||||
|
||||
List<TaskDO> readFinishedTask();
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package tech.powerjob.worker.persistence.fs;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* FileSystemService
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
public interface FsService extends Closeable {
|
||||
|
||||
void writeLine(String content) throws IOException;
|
||||
|
||||
String readLine() throws IOException;
|
||||
}
|
@ -0,0 +1,116 @@
|
||||
package tech.powerjob.worker.persistence.fs.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.fs.FsService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 外部文件存储服务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService {
|
||||
private final Long instanceId;
|
||||
|
||||
private final FsService pendingFsService;
|
||||
|
||||
private final FsService resultFsService;
|
||||
|
||||
private static final String PENDING_FILE_NAME = "%d-pending";
|
||||
private static final String RESULT_FILE_NAME = "%d-result";
|
||||
|
||||
public ExternalTaskFileSystemPersistenceService(Long instanceId, boolean needResult) {
|
||||
this.instanceId = instanceId;
|
||||
|
||||
this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId));
|
||||
if (needResult) {
|
||||
this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId));
|
||||
} else {
|
||||
this.resultFsService = new FsService() {
|
||||
@Override
|
||||
public void writeLine(String content) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean persistPendingTask(List<TaskDO> tasks) {
|
||||
try {
|
||||
String content = JsonUtils.toJSONString(tasks);
|
||||
pendingFsService.writeLine(content);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public List<TaskDO> readPendingTask() {
|
||||
String pendingTaskStr = pendingFsService.readLine();
|
||||
TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
|
||||
if (taskDOS != null) {
|
||||
return Lists.newArrayList(taskDOS);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean persistFinishedTask(List<TaskDO> tasks) {
|
||||
try {
|
||||
String content = JsonUtils.toJSONString(tasks);
|
||||
resultFsService.writeLine(content);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public List<TaskDO> readFinishedTask() {
|
||||
String pendingTaskStr = resultFsService.readLine();
|
||||
TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
|
||||
if (taskDOS != null) {
|
||||
return Lists.newArrayList(taskDOS);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (pendingFsService != null) {
|
||||
pendingFsService.close();
|
||||
}
|
||||
});
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (resultFsService != null) {
|
||||
resultFsService.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package tech.powerjob.worker.persistence.fs.impl;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.worker.common.utils.PowerFileUtils;
|
||||
import tech.powerjob.worker.persistence.fs.FsService;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* 本地磁盘
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
@Slf4j
|
||||
public class LocalDiskFsService implements FsService {
|
||||
|
||||
private static final String WORKSPACE_PATH = PowerFileUtils.workspace() + "/fs/" + CommonUtils.genUUID() + "/";
|
||||
|
||||
private static final String FILE_NAME_PATTERN = "%s.powerjob";
|
||||
|
||||
private final File file;
|
||||
private final BufferedWriter bufferedWriter;
|
||||
|
||||
private final BufferedReader bufferedReader;
|
||||
|
||||
@SneakyThrows
|
||||
public LocalDiskFsService(String keyword) {
|
||||
String fileName = String.format(FILE_NAME_PATTERN, keyword);
|
||||
String filePath = WORKSPACE_PATH.concat(fileName);
|
||||
|
||||
this.file = new File(filePath);
|
||||
FileUtils.createParentDirectories(file);
|
||||
|
||||
// 在使用 BufferedReader 包装 FileReader 的情况下,不需要单独关闭 FileReader。当你调用 BufferedReader 的 close() 方法时,它会负责关闭它所包装的 FileReader。这是因为 BufferedReader.close() 方法内部会调用它所包装的流的 close() 方法,确保所有相关资源都被释放,包括底层的文件句柄
|
||||
FileWriter fileWriter = new FileWriter(file);
|
||||
this.bufferedWriter = new BufferedWriter(fileWriter);
|
||||
this.bufferedReader = new BufferedReader(new FileReader(file));
|
||||
|
||||
log.info("[LocalDiskFsService] new LocalDiskFsService successfully, path: {}", filePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* 按行写数据,线程不安全,考虑到此处不用太在意性能,直接 synchronized
|
||||
* @param content 内容
|
||||
* @throws IOException 异常
|
||||
*/
|
||||
@Override
|
||||
public synchronized void writeLine(String content) throws IOException {
|
||||
bufferedWriter.write(content);
|
||||
bufferedWriter.newLine();
|
||||
bufferedWriter.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* 按行读数据,线程不安全,考虑到此处不用太在意性能,直接 synchronized
|
||||
* @return 内容
|
||||
* @throws IOException 异常
|
||||
*/
|
||||
@Override
|
||||
public synchronized String readLine() throws IOException {
|
||||
return bufferedReader.readLine();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (bufferedWriter != null) {
|
||||
bufferedWriter.close();
|
||||
}
|
||||
});
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (bufferedReader != null) {
|
||||
bufferedReader.close();
|
||||
}
|
||||
});
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
boolean delete = file.delete();
|
||||
log.info("[LocalDiskFsService] delete file[{}] result: {}", file, delete);
|
||||
});
|
||||
}
|
||||
}
|
@ -2,12 +2,15 @@ package tech.powerjob.worker.persistence;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.util.StopWatch;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.db.ConnectionFactory;
|
||||
import tech.powerjob.worker.persistence.db.SimpleTaskQuery;
|
||||
import tech.powerjob.worker.persistence.db.TaskDAO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDAOImpl;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -3,14 +3,16 @@ package tech.powerjob.worker.persistence;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.db.ConnectionFactory;
|
||||
import tech.powerjob.worker.persistence.db.SimpleTaskQuery;
|
||||
import tech.powerjob.worker.persistence.db.TaskDAO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDAOImpl;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.SQLIntegrityConstraintViolationException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -3,6 +3,7 @@ package tech.powerjob.worker.test;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
import tech.powerjob.worker.persistence.DbTaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -21,7 +22,7 @@ import static tech.powerjob.worker.core.tracker.task.heavy.CommonTaskTracker.ROO
|
||||
*/
|
||||
public class PersistenceServiceTest {
|
||||
|
||||
private static TaskPersistenceService taskPersistenceService = new TaskPersistenceService(StoreStrategy.DISK);
|
||||
private static final TaskPersistenceService taskPersistenceService = new DbTaskPersistenceService(StoreStrategy.DISK);
|
||||
|
||||
@BeforeAll
|
||||
public static void initTable() throws Exception {
|
||||
|
Loading…
x
Reference in New Issue
Block a user