mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: [SuperMR] Map/MapReduce job can use swap to support ∞ subtask
This commit is contained in:
parent
cf4ed93812
commit
c717fd3fb8
@ -1,8 +1,10 @@
|
||||
package tech.powerjob.common.serialize;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.MapperFeature;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,8 +30,13 @@ public class JsonUtils {
|
||||
.configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true)
|
||||
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
|
||||
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
.build();
|
||||
|
||||
static {
|
||||
JSON_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
|
||||
}
|
||||
|
||||
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>> () {};
|
||||
|
||||
private JsonUtils(){
|
||||
@ -37,6 +44,9 @@ public class JsonUtils {
|
||||
}
|
||||
|
||||
public static String toJSONString(Object obj) {
|
||||
if (obj == null) {
|
||||
return null;
|
||||
}
|
||||
if (obj instanceof String) {
|
||||
return (String) obj;
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import java.util.concurrent.ThreadLocalRandom;
|
||||
* @since 2020/4/17
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Component("testMapReduceProcessor")
|
||||
public class MapReduceProcessorDemo implements MapReduceProcessor {
|
||||
|
||||
@Override
|
||||
|
@ -19,7 +19,10 @@ import tech.powerjob.worker.common.constants.TaskConstant;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.common.utils.TransportUtils;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.SwapTaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -51,6 +54,11 @@ public class CommonTaskTracker extends HeavyTaskTracker {
|
||||
super(req, workerRuntime);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TaskPersistenceService initTaskPersistenceService(InstanceInfo instanceInfo, WorkerRuntime workerRuntime) {
|
||||
return new SwapTaskPersistenceService(instanceInfo, workerRuntime.getTaskPersistenceService());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initTaskTracker(ServerScheduleJobReq req) {
|
||||
|
||||
|
@ -30,6 +30,7 @@ import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
|
||||
import tech.powerjob.worker.core.tracker.task.TaskTracker;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
|
||||
@ -82,7 +83,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
// 保护性操作
|
||||
instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
|
||||
this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
|
||||
this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
|
||||
this.taskPersistenceService = initTaskPersistenceService(instanceInfo, workerRuntime);
|
||||
// 构建缓存
|
||||
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build();
|
||||
|
||||
@ -95,6 +96,10 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId);
|
||||
}
|
||||
|
||||
protected TaskPersistenceService initTaskPersistenceService(InstanceInfo instanceInfo, WorkerRuntime workerRuntime) {
|
||||
return workerRuntime.getTaskPersistenceService();
|
||||
}
|
||||
|
||||
/**
|
||||
* 静态方法创建 TaskTracker
|
||||
*
|
||||
|
@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.common.PowerJobDKey;
|
||||
import tech.powerjob.common.enhance.SafeRunnable;
|
||||
import tech.powerjob.common.enums.ExecuteType;
|
||||
import tech.powerjob.common.utils.CollectionUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.common.utils.MapUtils;
|
||||
@ -11,6 +12,7 @@ import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@ -42,9 +44,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
private final LongAdder externalFailedRecordNum = new LongAdder();
|
||||
|
||||
private final boolean needResult;
|
||||
private final boolean canUseSwap;
|
||||
private final TaskPersistenceService dbTaskPersistenceService;
|
||||
|
||||
private boolean swapEnabled;
|
||||
private volatile boolean finished = false;
|
||||
private ExternalTaskPersistenceService externalTaskPersistenceService;
|
||||
|
||||
private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000;
|
||||
@ -54,11 +58,14 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
*/
|
||||
private static final long DEFAULT_SCHEDULE_TIME = 60000;
|
||||
|
||||
public SwapTaskPersistenceService(Long instanceId, boolean needResult, TaskPersistenceService dbTaskPersistenceService) {
|
||||
this.instanceId = instanceId;
|
||||
this.needResult = needResult;
|
||||
public SwapTaskPersistenceService(InstanceInfo instanceInfo, TaskPersistenceService dbTaskPersistenceService) {
|
||||
this.instanceId = instanceInfo.getInstanceId();
|
||||
this.needResult = ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
|
||||
this.canUseSwap = ExecuteType.MAP.name().equalsIgnoreCase(instanceInfo.getExecuteType()) || ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
|
||||
this.dbTaskPersistenceService = dbTaskPersistenceService;
|
||||
this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM)));
|
||||
|
||||
log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -117,7 +124,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
|
||||
long dbNum = dbRecordNum.sum();
|
||||
|
||||
if (dbNum > maxActiveTaskNum) {
|
||||
if (canUseSwap && dbNum > maxActiveTaskNum) {
|
||||
|
||||
// 上层保证启用 SWAP 的任务,batchSave 的都是等待调度的任务,不会参与真正的运行
|
||||
boolean persistPendingTaskRes = getExternalTaskPersistenceService().persistPendingTask(tasks);
|
||||
@ -136,8 +143,9 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
|
||||
@Override
|
||||
public boolean deleteAllTasks(Long instanceId) {
|
||||
finished = true;
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (externalTaskPersistenceService != null) {
|
||||
if (swapEnabled) {
|
||||
externalTaskPersistenceService.close();
|
||||
}
|
||||
});
|
||||
@ -195,11 +203,17 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
|
||||
@Override
|
||||
protected void run0() {
|
||||
while (true) {
|
||||
|
||||
CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME);
|
||||
if (finished) {
|
||||
return;
|
||||
}
|
||||
|
||||
moveInPendingTask();
|
||||
moveOutFinishedTask();
|
||||
CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME);
|
||||
|
||||
moveInPendingTask();
|
||||
moveOutFinishedTask();
|
||||
}
|
||||
}
|
||||
|
||||
private void moveInPendingTask() {
|
||||
@ -220,7 +234,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
|
||||
// 队列空则跳出循环,等待下一次扫描
|
||||
if (CollectionUtils.isEmpty(taskDOS)) {
|
||||
log.debug("[YuGong-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId);
|
||||
log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -228,11 +242,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
externalPendingRecordNum.add(-taskDOS.size());
|
||||
|
||||
boolean persistTask2Db = persistTask2Db(taskDOS);
|
||||
log.info("[YuGong-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
|
||||
log.info("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
|
||||
|
||||
// 持久化失败的情况,及时跳出本次循环,防止损失扩大,等待下次扫描
|
||||
if (!persistTask2Db) {
|
||||
log.error("[YuGong-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", instanceId, taskDOS);
|
||||
log.error("[SwapTaskPersistenceService-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", instanceId, taskDOS);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -268,7 +282,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
|
||||
|
||||
private void moveOutDetailFinishedTask(List<TaskDO> tasks, boolean success) {
|
||||
|
||||
String logKey = String.format("[YuGong-%d] [moveOut%sTask] ", instanceId, success ? "Success" : "Failed");
|
||||
String logKey = String.format("[SwapTaskPersistenceService-%d] [moveOut%sTask] ", instanceId, success ? "Success" : "Failed");
|
||||
|
||||
boolean persistFinishedTask2ExternalResult = getExternalTaskPersistenceService().persistFinishedTask(tasks);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user