feat: [SuperMR] YuGong

This commit is contained in:
tjq 2024-02-24 11:50:16 +08:00
parent dda79439ca
commit cf4ed93812
10 changed files with 444 additions and 38 deletions

View File

@ -55,4 +55,11 @@ public class PowerJobDKey {
*/
public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval";
/* ******************* 不太可能有人用的参数 ******************* */
/**
* 最大活跃任务数量超出部分 SWAP 到磁盘以提升性能
*/
public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num";
}

View File

@ -0,0 +1,49 @@
package tech.powerjob.common.utils;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.Map;
/**
* MapUtils
*
* @author tjq
* @since 2024/2/24
*/
public class MapUtils {
public static <K> long getLongValue(Map<? super K, ?> map, K key) {
Long longObject = getLong(map, key);
return longObject == null ? 0L : longObject;
}
public static <K> Long getLong(Map<? super K, ?> map, K key) {
Number answer = getNumber(map, key);
if (answer == null) {
return null;
} else {
return answer instanceof Long ? (Long)answer : answer.longValue();
}
}
public static <K> Number getNumber(Map<? super K, ?> map, K key) {
if (map != null) {
Object answer = map.get(key);
if (answer != null) {
if (answer instanceof Number) {
return (Number)answer;
}
if (answer instanceof String) {
try {
String text = (String)answer;
return NumberFormat.getInstance().parse(text);
} catch (ParseException var4) {
}
}
}
}
return null;
}
}

View File

@ -18,6 +18,7 @@ import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.persistence.TaskDO;
import java.util.List;
@ -115,7 +116,7 @@ public class CommonTaskTracker extends HeavyTaskTracker {
rootTask.setLastReportTime(-1L);
rootTask.setSubInstanceId(instanceId);
if (taskPersistenceService.save(rootTask)) {
if (taskPersistenceService.batchSave(Lists.newArrayList(rootTask))) {
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
} else {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
@ -171,13 +172,13 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// STANDALONE 只有一个任务完成即结束
case STANDALONE:
finished.set(true);
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
List<TaskResult> allTaskResult = taskPersistenceService.getAllTaskResult(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTaskResult) || allTaskResult.size() > 1) {
result = SystemInstanceResult.UNKNOWN_BUG;
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
} else {
result = allTask.get(0).getResult();
success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
result = allTaskResult.get(0).getResult();
success = allTaskResult.get(0).isSuccess();
}
break;
// MAP 不关心结果最简单

View File

@ -22,6 +22,7 @@ import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.LRUCache;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.persistence.TaskDO;
import java.util.*;
@ -203,7 +204,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
}
// 必须先持久化持久化成功才能 dispatch否则会导致后续报错因为DB中没有这个taskId对应的记录会各种报错
if (!taskPersistenceService.save(newRootTask)) {
if (!taskPersistenceService.batchSave(Lists.newArrayList(newRootTask))) {
log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId);
processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED");
return;
@ -298,9 +299,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
// STANDALONE 代表任务确实已经执行完毕了
case STANDALONE:
// 查询数据库获取结果STANDALONE每个SubInstance只会有一条Task记录
TaskDO resultTask = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0);
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
onFinished(subInstanceId, success, resultTask.getResult(), iterator);
TaskResult resultTask = taskPersistenceService.getAllTaskResult(instanceId, subInstanceId).get(0);
onFinished(subInstanceId, resultTask.isSuccess(), resultTask.getResult(), iterator);
continue;
// MAP 不关心结果最简单
case MAP:

View File

@ -57,7 +57,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
/**
* 数据库持久化服务
*/
protected final TaskPersistenceService taskPersistenceService;
protected TaskPersistenceService taskPersistenceService;
/**
* 定时任务线程池
*/

View File

@ -16,6 +16,7 @@ import tech.powerjob.worker.persistence.db.SimpleTaskQuery;
import tech.powerjob.worker.persistence.db.TaskDAO;
import tech.powerjob.worker.persistence.db.TaskDAOImpl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -60,17 +61,6 @@ public class DbTaskPersistenceService implements TaskPersistenceService {
taskDAO.initTable();
}
@Override
public boolean save(TaskDO task) {
try {
return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] save task{} failed.", task, e);
}
return false;
}
@Override
public boolean batchSave(List<TaskDO> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
@ -171,19 +161,6 @@ public class DbTaskPersistenceService implements TaskPersistenceService {
return Optional.empty();
}
@Override
public List<TaskDO> getAllTask(Long instanceId, Long subInstanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
}
return Lists.newArrayList();
}
/**
* 获取某个 ProcessorTracker 未完成的任务
* @param instanceId instanceId
@ -313,6 +290,19 @@ public class DbTaskPersistenceService implements TaskPersistenceService {
return false;
}
@Override
public boolean deleteTasksByTaskIds(Long instanceId, Collection<String> taskId) {
try {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setTaskIds(taskId);
return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteTasksByTaskIds cost {}ms", instanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] deleteTasksByTaskIds failed, instanceId={}.", instanceId, e);
}
return false;
}
private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);

View File

@ -0,0 +1,320 @@
package tech.powerjob.worker.persistence;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.MapUtils;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
/**
* SWAP换入换出降低运行时开销
*
* @author tjq
* @since 2024/2/23
*/
@Slf4j
public class SwapTaskPersistenceService implements TaskPersistenceService {
private final Long instanceId;
private final long maxActiveTaskNum;
/**
* 数据库记录数量不要求完全精确仅用于控制存哪里有一定容忍度
*/
private final LongAdder dbRecordNum = new LongAdder();
/**
* 外部存储的任务数量必须精确否则会导致任务卡住
*/
private final LongAdder externalPendingRecordNum = new LongAdder();
private final LongAdder externalSucceedRecordNum = new LongAdder();
private final LongAdder externalFailedRecordNum = new LongAdder();
private final boolean needResult;
private final TaskPersistenceService dbTaskPersistenceService;
private boolean swapEnabled;
private ExternalTaskPersistenceService externalTaskPersistenceService;
private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000;
/**
* 默认工作频率
*/
private static final long DEFAULT_SCHEDULE_TIME = 60000;
public SwapTaskPersistenceService(Long instanceId, boolean needResult, TaskPersistenceService dbTaskPersistenceService) {
this.instanceId = instanceId;
this.needResult = needResult;
this.dbTaskPersistenceService = dbTaskPersistenceService;
this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM)));
}
@Override
public void init() throws Exception {
}
@Override
public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) {
return dbTaskPersistenceService.updateTask(instanceId, taskId, updateEntity);
}
@Override
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
return dbTaskPersistenceService.updateTaskStatus(instanceId, taskId, status, lastReportTime, result);
}
@Override
public boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry) {
return dbTaskPersistenceService.updateLostTasks(instanceId, addressList, retry);
}
@Override
public Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId) {
return dbTaskPersistenceService.getLastTask(instanceId, subInstanceId);
}
@Override
public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
return dbTaskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, address);
}
@Override
public List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit) {
return dbTaskPersistenceService.getTaskByStatus(instanceId, status, limit);
}
@Override
public Optional<TaskDO> getTask(Long instanceId, String taskId) {
return dbTaskPersistenceService.getTask(instanceId, taskId);
}
@Override
public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) {
return dbTaskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId);
}
@Override
public boolean deleteTasksByTaskIds(Long instanceId, Collection<String> taskId) {
return dbTaskPersistenceService.deleteTasksByTaskIds(instanceId, taskId);
}
/* 重写区 */
@Override
public boolean batchSave(List<TaskDO> tasks) {
long dbNum = dbRecordNum.sum();
if (dbNum > maxActiveTaskNum) {
// 上层保证启用 SWAP 的任务batchSave 的都是等待调度的任务不会参与真正的运行
boolean persistPendingTaskRes = getExternalTaskPersistenceService().persistPendingTask(tasks);
// 仅成功情况累加按严格模式累加防止出现任务无法停止的问题文件系统实际应该比较稳定此处出错概率不高
if (persistPendingTaskRes) {
externalPendingRecordNum.add(tasks.size());
}
log.info("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum);
return persistPendingTaskRes;
} else {
return persistTask2Db(tasks);
}
}
@Override
public boolean deleteAllTasks(Long instanceId) {
CommonUtils.executeIgnoreException(() -> {
if (externalTaskPersistenceService != null) {
externalTaskPersistenceService.close();
}
});
return dbTaskPersistenceService.deleteAllTasks(instanceId);
}
@Override
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
Map<TaskStatus, Long> taskStatusStatistics = dbTaskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
if (!swapEnabled) {
return taskStatusStatistics;
}
long waitingNum = MapUtils.getLongValue(taskStatusStatistics, TaskStatus.WAITING_DISPATCH) + externalPendingRecordNum.sum();
long succeedNum = MapUtils.getLongValue(taskStatusStatistics, TaskStatus.WORKER_PROCESS_SUCCESS) + externalSucceedRecordNum.sum();
long failedNum = MapUtils.getLongValue(taskStatusStatistics, TaskStatus.WORKER_PROCESS_FAILED) + externalFailedRecordNum.sum();
taskStatusStatistics.put(TaskStatus.WAITING_DISPATCH, waitingNum);
taskStatusStatistics.put(TaskStatus.WORKER_PROCESS_SUCCESS, succeedNum);
taskStatusStatistics.put(TaskStatus.WORKER_PROCESS_FAILED, failedNum);
return taskStatusStatistics;
}
@Override
public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
List<TaskResult> dbTaskResult = dbTaskPersistenceService.getAllTaskResult(instanceId, subInstanceId);
if (!swapEnabled) {
return dbTaskResult;
}
List<TaskResult> allTaskResult = Lists.newLinkedList(dbTaskResult);
while (true) {
List<TaskDO> externalTask = externalTaskPersistenceService.readFinishedTask();
if (CollectionUtils.isEmpty(externalTask)) {
break;
}
externalTask.forEach(t -> {
TaskResult taskResult = new TaskResult();
taskResult.setTaskId(t.getTaskId());
taskResult.setSuccess(TaskStatus.WORKER_PROCESS_SUCCESS.getValue() == t.getStatus());
taskResult.setResult(t.getResult());
allTaskResult.add(taskResult);
});
}
return allTaskResult;
// TODO: 后续支持 stream 流式 reduce
}
private class YuGong extends SafeRunnable {
@Override
protected void run0() {
CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME);
moveInPendingTask();
moveOutFinishedTask();
}
private void moveInPendingTask() {
while (true) {
// 外部存储无数据无需扫描
if (externalPendingRecordNum.sum() <= 0) {
return;
}
// 到达 DB 最大数量后跳出扫描
if (dbRecordNum.sum() > maxActiveTaskNum) {
return;
}
List<TaskDO> taskDOS = getExternalTaskPersistenceService().readPendingTask();
// 队列空则跳出循环等待下一次扫描
if (CollectionUtils.isEmpty(taskDOS)) {
log.debug("[YuGong-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId);
return;
}
// 一旦读取无论结果如何都直接减数量无论后续结果如何
externalPendingRecordNum.add(-taskDOS.size());
boolean persistTask2Db = persistTask2Db(taskDOS);
log.info("[YuGong-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
// 持久化失败的情况及时跳出本次循环防止损失扩大等待下次扫描
if (!persistTask2Db) {
log.error("[YuGong-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", instanceId, taskDOS);
return;
}
}
}
private void moveOutFinishedTask() {
while (true) {
// 一旦启动 SWAP需要移出更多的数据来灌入最多允许
long maxRemainNum = maxActiveTaskNum / 2;
if (dbRecordNum.sum() <= maxRemainNum) {
return;
}
List<TaskDO> succeedTasks = dbTaskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WORKER_PROCESS_SUCCESS, 100);
if (!CollectionUtils.isEmpty(succeedTasks)) {
moveOutDetailFinishedTask(succeedTasks, true);
// 优先搬运成功数据100% 已固化失败任务可能还夜长梦多
continue;
}
List<TaskDO> failedTask = dbTaskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WORKER_PROCESS_FAILED, 100);
// 还没有已完成任务产生 or 移完了先整体 finished 跳出循环等待下个调度周期
if (CollectionUtils.isEmpty(failedTask)) {
return;
}
moveOutDetailFinishedTask(failedTask, false);
}
}
private void moveOutDetailFinishedTask(List<TaskDO> tasks, boolean success) {
String logKey = String.format("[YuGong-%d] [moveOut%sTask] ", instanceId, success ? "Success" : "Failed");
boolean persistFinishedTask2ExternalResult = getExternalTaskPersistenceService().persistFinishedTask(tasks);
if (!persistFinishedTask2ExternalResult) {
log.warn("{} persistFinishedTask to external failed, skip this stage!", logKey);
}
LongAdder externalRecord = success ? externalSucceedRecordNum : externalFailedRecordNum;
// 持久化成功直接记录数量无论 DB 是否删除外部数据已存在100% 会被并入统计
int moveOutNum = tasks.size();
externalRecord.add(moveOutNum);
List<String> deleteTaskIds = tasks.stream().map(TaskDO::getTaskId).collect(Collectors.toList());
boolean deleteTasksByTaskIdsResult = dbTaskPersistenceService.deleteTasksByTaskIds(instanceId, deleteTaskIds);
if (deleteTasksByTaskIdsResult) {
dbRecordNum.add(-moveOutNum);
log.info("{} move task to external successfully(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalRecord, dbRecordNum);
} else {
// DB 删除失败影响不大reduce 重复而已
log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalRecord, dbRecordNum, deleteTaskIds);
}
}
}
private boolean persistTask2Db(List<TaskDO> taskDOS) {
dbRecordNum.add(taskDOS.size());
return dbTaskPersistenceService.batchSave(taskDOS);
}
private ExternalTaskPersistenceService getExternalTaskPersistenceService() {
if (externalTaskPersistenceService != null) {
return externalTaskPersistenceService;
}
synchronized (this) {
if (externalTaskPersistenceService != null) {
return externalTaskPersistenceService;
}
// 初始化 SWAP 相关内容
this.swapEnabled = true;
this.externalTaskPersistenceService = new ExternalTaskFileSystemPersistenceService(instanceId, needResult);
new Thread(new YuGong(), "PJ-YuGong-" + instanceId).start();
return externalTaskPersistenceService;
}
}
}

View File

@ -4,6 +4,7 @@ package tech.powerjob.worker.persistence;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -18,8 +19,6 @@ public interface TaskPersistenceService {
void init() throws Exception;
boolean save(TaskDO task);
boolean batchSave(List<TaskDO> tasks);
boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity);
@ -30,8 +29,6 @@ public interface TaskPersistenceService {
Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId);
List<TaskDO> getAllTask(Long instanceId, Long subInstanceId);
List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address);
List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit);
@ -45,4 +42,6 @@ public interface TaskPersistenceService {
boolean deleteAllTasks(Long instanceId);
boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId);
boolean deleteTasksByTaskIds(Long instanceId, Collection<String> taskId);
}

View File

@ -2,6 +2,10 @@ package tech.powerjob.worker.persistence.db;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.utils.CollectionUtils;
import java.util.Collection;
import java.util.stream.Collectors;
/**
* 简单查询直接类只支持 select * from task_info where xxx = xxx and xxx = xxx 的查询
@ -15,6 +19,9 @@ public class SimpleTaskQuery {
private static final String LINK = " and ";
private String taskId;
private Collection<String> taskIds;
private Long subInstanceId;
private Long instanceId;
private String taskName;
@ -36,6 +43,11 @@ public class SimpleTaskQuery {
if (!StringUtils.isEmpty(taskId)) {
sb.append("task_id = '").append(taskId).append("'").append(LINK);
}
if (!CollectionUtils.isEmpty(taskIds)) {
String taskIdsInQuery = taskIds.stream().map(id -> String.format("'%s'", id)).collect(Collectors.joining(", "));
sb.append("task_id in (").append(taskIdsInQuery).append(")").append(LINK);
}
if (subInstanceId != null) {
sb.append("sub_instance_id = ").append(subInstanceId).append(LINK);
}

View File

@ -0,0 +1,28 @@
package tech.powerjob.worker.persistence.db;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
/**
* SimpleTaskQueryTest
*
* @author tjq
* @since 2024/2/24
*/
class SimpleTaskQueryTest {
@Test
void test() {
SimpleTaskQuery simpleTaskQuery = new SimpleTaskQuery();
simpleTaskQuery.setInstanceId(10086L);
simpleTaskQuery.setTaskIds(Lists.newArrayList("taskId1", "taskId2", "taskId3"));
String queryCondition = simpleTaskQuery.getQueryCondition();
System.out.println(queryCondition);
assertEquals("task_id in ('taskId1', 'taskId2', 'taskId3') and instance_id = 10086", queryCondition);
}
}