feat: support TaskTrackerBehavior(PADDLING)

This commit is contained in:
tjq 2024-02-24 20:58:48 +08:00
parent 815d44ef7e
commit 4046ea39b5
13 changed files with 134 additions and 1 deletions

View File

@ -0,0 +1,42 @@
package tech.powerjob.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* TaskTracker 行为枚举
*
* @author tjq
* @since 2024/2/24
*/
@Getter
@AllArgsConstructor
public enum TaskTrackerBehavior {
/**
* 普通不特殊处理参与集群计算会导致 TaskTracker 负载比常规节点高适用于节点数不那么多任务不那么繁重的场景
*/
NORMAL(1),
/**
* 划水只负责管理节点不参与计算稳定性最优适用于节点数量非常多的大规模计算场景少一个计算节点来换取稳定性提升
*/
PADDLING(11)
;
private final Integer v;
public static TaskTrackerBehavior of(Integer type) {
if (type == null) {
return NORMAL;
}
for (TaskTrackerBehavior t : values()) {
if (t.v.equals(type)) {
return t;
}
}
return NORMAL;
}
}

View File

@ -0,0 +1,25 @@
package tech.powerjob.common.model;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
* 任务运行时高级配置
*
* @author tjq
* @since 2024/2/24
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class JobAdvancedRuntimeConfig {
/**
* MR 任务专享参数TaskTracker 行为 {@link tech.powerjob.common.enums.TaskTrackerBehavior}
*/
private Integer taskTrackerBehavior;
}

View File

@ -97,4 +97,9 @@ public class ServerScheduleJobReq implements PowerSerializable {
* 日志配置
*/
private String logConfig;
/**
* 高级运行时配置
*/
private String advancedRuntimeConfig;
}

View File

@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobAdvancedRuntimeConfig;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
@ -155,6 +156,10 @@ public class SaveJobInfoRequest {
*/
private LogConfig logConfig;
/**
* 高级运行时配置
*/
private JobAdvancedRuntimeConfig advancedRuntimeConfig;
/**
* Check non-null properties.

View File

@ -2,6 +2,7 @@ package tech.powerjob.common.response;
import lombok.Data;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobAdvancedRuntimeConfig;
import tech.powerjob.common.model.LogConfig;
import java.util.Date;
@ -144,4 +145,5 @@ public class JobInfoDTO {
*/
private LogConfig logConfig;
private JobAdvancedRuntimeConfig advancedRuntimeConfig;
}

View File

@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobAdvancedRuntimeConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
@ -40,6 +41,7 @@ public class JobConverter {
saveJobInfoRequest.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle()));
saveJobInfoRequest.setAlarmConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getAlarmConfig(), AlarmConfig.class));
saveJobInfoRequest.setLogConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getLogConfig(), LogConfig.class));
saveJobInfoRequest.setAdvancedRuntimeConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getAdvancedRuntimeConfig(), JobAdvancedRuntimeConfig.class));
return saveJobInfoRequest;
}
@ -49,6 +51,14 @@ public class JobConverter {
if (jobInfoDO.getAlarmConfig() != null) {
jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class));
}
if (StringUtils.isNotEmpty(jobInfoDO.getLogConfig())) {
jobInfoDTO.setLogConfig(JSON.parseObject(jobInfoDO.getLogConfig(), LogConfig.class));
}
if (StringUtils.isNotEmpty(jobInfoDO.getAdvancedRuntimeConfig())) {
jobInfoDTO.setAdvancedRuntimeConfig(JSON.parseObject(jobInfoDO.getAdvancedRuntimeConfig(), JobAdvancedRuntimeConfig.class));
}
return jobInfoDTO;
}

View File

@ -116,6 +116,10 @@ public class JobServiceImpl implements JobService {
if (request.getLogConfig() != null) {
jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig()));
}
// 日志配置
if (request.getAdvancedRuntimeConfig() != null) {
jobInfoDO.setAdvancedRuntimeConfig(JSONObject.toJSONString(request.getAdvancedRuntimeConfig()));
}
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
return res.getId();
}

View File

@ -163,4 +163,10 @@ public class JobInfoDO {
* 日志配置包括日志级别日志方式等配置信息
*/
private String logConfig;
/**
* 高级运行时配置
* 不需要用于索引的高级运行参数后续统一存储到这里便于版本升级尽可能保证数据库表结构稳定
*/
private String advancedRuntimeConfig;
}

View File

@ -7,6 +7,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobAdvancedRuntimeConfig;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
@ -161,6 +162,8 @@ public class JobInfoVO {
*/
private LogConfig logConfig;
private JobAdvancedRuntimeConfig advancedRuntimeConfig;
public static JobInfoVO from(JobInfoDO jobInfoDO) {
JobInfoVO jobInfoVO = new JobInfoVO();
BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
@ -199,6 +202,12 @@ public class JobInfoVO {
jobInfoVO.setLogConfig(new LogConfig());
}
if (StringUtils.isEmpty(jobInfoDO.getAdvancedRuntimeConfig())) {
jobInfoVO.setAdvancedRuntimeConfig(new JobAdvancedRuntimeConfig());
} else {
jobInfoVO.setAdvancedRuntimeConfig(JSONObject.parseObject(jobInfoDO.getAdvancedRuntimeConfig(), JobAdvancedRuntimeConfig.class));
}
return jobInfoVO;
}
}

View File

@ -5,14 +5,17 @@ import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.model.JobAdvancedRuntimeConfig;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -35,6 +38,8 @@ public abstract class TaskTracker {
*/
protected final InstanceInfo instanceInfo;
protected final ExecuteType executeType;
protected final JobAdvancedRuntimeConfig advancedRuntimeConfig;
/**
* 追加的工作流上下文数据
*
@ -76,9 +81,11 @@ public abstract class TaskTracker {
instanceInfo.setTaskRetryNum(req.getTaskRetryNum());
instanceInfo.setLogConfig(req.getLogConfig());
instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS());
instanceInfo.setAdvancedRuntimeConfig(req.getAdvancedRuntimeConfig());
// 常用变量初始化
executeType = ExecuteType.valueOf(req.getExecuteType());
advancedRuntimeConfig = Optional.ofNullable(req.getAdvancedRuntimeConfig()).map(x -> JsonUtils.parseObjectIgnoreException(x, JobAdvancedRuntimeConfig.class)).orElse(new JobAdvancedRuntimeConfig());
// 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) {

View File

@ -11,6 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TaskTrackerBehavior;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.WorkerQueryExecutorClusterReq;
@ -489,7 +490,13 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 获取 ProcessorTracker 地址如果 Task 中自带了 Address则使用该 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
if (taskNeedByPassTaskTracker()) {
do {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
} else {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
}
}
dispatchTask(task, ptAddress);
});
@ -502,6 +509,13 @@ public abstract class HeavyTaskTracker extends TaskTracker {
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
}
private boolean taskNeedByPassTaskTracker() {
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
}
return false;
}
}
/**

View File

@ -53,4 +53,6 @@ public class InstanceInfo implements Serializable {
private int taskRetryNum;
private String logConfig;
private String advancedRuntimeConfig;
}

View File

@ -34,6 +34,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
private String logConfig;
private String advancedRuntimeConfig;
/**
* 创建 TaskTrackerStartTaskReq该构造方法必须在 TaskTracker 节点调用
@ -51,5 +52,6 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
this.subInstanceId = task.getSubInstanceId();
this.logConfig = instanceInfo.getLogConfig();
this.advancedRuntimeConfig = instanceInfo.getAdvancedRuntimeConfig();
}
}