From 4046ea39b526add8726a8ae58401a731cd9dc316 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 24 Feb 2024 20:58:48 +0800 Subject: [PATCH] feat: support TaskTrackerBehavior(PADDLING) --- .../common/enums/TaskTrackerBehavior.java | 42 +++++++++++++++++++ .../model/JobAdvancedRuntimeConfig.java | 25 +++++++++++ .../common/request/ServerScheduleJobReq.java | 5 +++ .../request/http/SaveJobInfoRequest.java | 5 +++ .../powerjob/common/response/JobInfoDTO.java | 2 + .../core/service/impl/job/JobConverter.java | 10 +++++ .../core/service/impl/job/JobServiceImpl.java | 4 ++ .../persistence/remote/model/JobInfoDO.java | 6 +++ .../server/web/response/JobInfoVO.java | 9 ++++ .../worker/core/tracker/task/TaskTracker.java | 7 ++++ .../tracker/task/heavy/HeavyTaskTracker.java | 16 ++++++- .../worker/pojo/model/InstanceInfo.java | 2 + .../pojo/request/TaskTrackerStartTaskReq.java | 2 + 13 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/enums/TaskTrackerBehavior.java create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/model/JobAdvancedRuntimeConfig.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/TaskTrackerBehavior.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/TaskTrackerBehavior.java new file mode 100644 index 00000000..3d081e6c --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/TaskTrackerBehavior.java @@ -0,0 +1,42 @@ +package tech.powerjob.common.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * TaskTracker 行为枚举 + * + * @author tjq + * @since 2024/2/24 + */ +@Getter +@AllArgsConstructor +public enum TaskTrackerBehavior { + + /** + * 普通:不特殊处理,参与集群计算,会导致 TaskTracker 负载比常规节点高。适用于节点数不那么多,任务不那么繁重的场景 + */ + NORMAL(1), + /** + * 划水:只负责管理节点,不参与计算,稳定性最优。适用于节点数量非常多的大规模计算场景,少一个计算节点来换取稳定性提升 + */ + PADDLING(11) + ; + + + private final Integer v; + + public static TaskTrackerBehavior of(Integer type) { + + if (type == null) { + return NORMAL; + } + + for (TaskTrackerBehavior t : values()) { + if (t.v.equals(type)) { + return t; + } + } + return NORMAL; + } +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/JobAdvancedRuntimeConfig.java b/powerjob-common/src/main/java/tech/powerjob/common/model/JobAdvancedRuntimeConfig.java new file mode 100644 index 00000000..1d3690a2 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/JobAdvancedRuntimeConfig.java @@ -0,0 +1,25 @@ +package tech.powerjob.common.model; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +/** + * 任务运行时高级配置 + * + * @author tjq + * @since 2024/2/24 + */ +@Getter +@Setter +@ToString +@Accessors(chain = true) +public class JobAdvancedRuntimeConfig { + + /** + * MR 任务专享参数,TaskTracker 行为 {@link tech.powerjob.common.enums.TaskTrackerBehavior} + */ + private Integer taskTrackerBehavior; + +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java index ac8df503..93702f23 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java @@ -97,4 +97,9 @@ public class ServerScheduleJobReq implements PowerSerializable { * 日志配置 */ private String logConfig; + + /** + * 高级运行时配置 + */ + private String advancedRuntimeConfig; } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java index d3006ec3..4b709736 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java @@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobAdvancedRuntimeConfig; import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; @@ -155,6 +156,10 @@ public class SaveJobInfoRequest { */ private LogConfig logConfig; + /** + * 高级运行时配置 + */ + private JobAdvancedRuntimeConfig advancedRuntimeConfig; /** * Check non-null properties. diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java index 2f616c20..9b62d2e6 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java @@ -2,6 +2,7 @@ package tech.powerjob.common.response; import lombok.Data; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobAdvancedRuntimeConfig; import tech.powerjob.common.model.LogConfig; import java.util.Date; @@ -144,4 +145,5 @@ public class JobInfoDTO { */ private LogConfig logConfig; + private JobAdvancedRuntimeConfig advancedRuntimeConfig; } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java index 7c45f4f2..b1ec2ae1 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobConverter.java @@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobAdvancedRuntimeConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.request.http.SaveJobInfoRequest; @@ -40,6 +41,7 @@ public class JobConverter { saveJobInfoRequest.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle())); saveJobInfoRequest.setAlarmConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getAlarmConfig(), AlarmConfig.class)); saveJobInfoRequest.setLogConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getLogConfig(), LogConfig.class)); + saveJobInfoRequest.setAdvancedRuntimeConfig(JsonUtils.parseObjectIgnoreException(jobInfoDO.getAdvancedRuntimeConfig(), JobAdvancedRuntimeConfig.class)); return saveJobInfoRequest; } @@ -49,6 +51,14 @@ public class JobConverter { if (jobInfoDO.getAlarmConfig() != null) { jobInfoDTO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(), AlarmConfig.class)); } + + if (StringUtils.isNotEmpty(jobInfoDO.getLogConfig())) { + jobInfoDTO.setLogConfig(JSON.parseObject(jobInfoDO.getLogConfig(), LogConfig.class)); + } + + if (StringUtils.isNotEmpty(jobInfoDO.getAdvancedRuntimeConfig())) { + jobInfoDTO.setAdvancedRuntimeConfig(JSON.parseObject(jobInfoDO.getAdvancedRuntimeConfig(), JobAdvancedRuntimeConfig.class)); + } return jobInfoDTO; } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java index e76345b3..6ed9e9ce 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java @@ -116,6 +116,10 @@ public class JobServiceImpl implements JobService { if (request.getLogConfig() != null) { jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig())); } + // 日志配置 + if (request.getAdvancedRuntimeConfig() != null) { + jobInfoDO.setAdvancedRuntimeConfig(JSONObject.toJSONString(request.getAdvancedRuntimeConfig())); + } JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO); return res.getId(); } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java index 212f10ce..95702354 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java @@ -163,4 +163,10 @@ public class JobInfoDO { * 日志配置,包括日志级别、日志方式等配置信息 */ private String logConfig; + + /** + * 高级运行时配置 + * 不需要用于索引的高级运行参数,后续统一存储到这里,便于版本升级(尽可能保证数据库表结构稳定) + */ + private String advancedRuntimeConfig; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java index 98bb9f0d..61c3a084 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java @@ -7,6 +7,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobAdvancedRuntimeConfig; import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; @@ -161,6 +162,8 @@ public class JobInfoVO { */ private LogConfig logConfig; + private JobAdvancedRuntimeConfig advancedRuntimeConfig; + public static JobInfoVO from(JobInfoDO jobInfoDO) { JobInfoVO jobInfoVO = new JobInfoVO(); BeanUtils.copyProperties(jobInfoDO, jobInfoVO); @@ -199,6 +202,12 @@ public class JobInfoVO { jobInfoVO.setLogConfig(new LogConfig()); } + if (StringUtils.isEmpty(jobInfoDO.getAdvancedRuntimeConfig())) { + jobInfoVO.setAdvancedRuntimeConfig(new JobAdvancedRuntimeConfig()); + } else { + jobInfoVO.setAdvancedRuntimeConfig(JSONObject.parseObject(jobInfoDO.getAdvancedRuntimeConfig(), JobAdvancedRuntimeConfig.class)); + } + return jobInfoVO; } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index dd398485..2836d205 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -5,14 +5,17 @@ import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; +import tech.powerjob.common.model.JobAdvancedRuntimeConfig; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.pojo.model.InstanceInfo; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -35,6 +38,8 @@ public abstract class TaskTracker { */ protected final InstanceInfo instanceInfo; protected final ExecuteType executeType; + + protected final JobAdvancedRuntimeConfig advancedRuntimeConfig; /** * 追加的工作流上下文数据 * @@ -76,9 +81,11 @@ public abstract class TaskTracker { instanceInfo.setTaskRetryNum(req.getTaskRetryNum()); instanceInfo.setLogConfig(req.getLogConfig()); instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS()); + instanceInfo.setAdvancedRuntimeConfig(req.getAdvancedRuntimeConfig()); // 常用变量初始化 executeType = ExecuteType.valueOf(req.getExecuteType()); + advancedRuntimeConfig = Optional.ofNullable(req.getAdvancedRuntimeConfig()).map(x -> JsonUtils.parseObjectIgnoreException(x, JobAdvancedRuntimeConfig.class)).orElse(new JobAdvancedRuntimeConfig()); // 特殊处理超时时间 if (instanceInfo.getInstanceTimeoutMS() <= 0) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 47568a4e..41fca7c2 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -11,6 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; +import tech.powerjob.common.enums.TaskTrackerBehavior; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.WorkerQueryExecutorClusterReq; @@ -489,7 +490,13 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { - ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); + if (taskNeedByPassTaskTracker()) { + do { + ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); + } while (workerRuntime.getWorkerAddress().equals(ptAddress)); + } else { + ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); + } } dispatchTask(task, ptAddress); }); @@ -502,6 +509,13 @@ public abstract class HeavyTaskTracker extends TaskTracker { log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } + + private boolean taskNeedByPassTaskTracker() { + if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) { + return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()); + } + return false; + } } /** diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java index c53e9b5b..985b5799 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java @@ -53,4 +53,6 @@ public class InstanceInfo implements Serializable { private int taskRetryNum; private String logConfig; + + private String advancedRuntimeConfig; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java index 7481ce5e..7fbd1ff2 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -34,6 +34,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable { private String logConfig; + private String advancedRuntimeConfig; /** * 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用 @@ -51,5 +52,6 @@ public class TaskTrackerStartTaskReq implements PowerSerializable { this.subInstanceId = task.getSubInstanceId(); this.logConfig = instanceInfo.getLogConfig(); + this.advancedRuntimeConfig = instanceInfo.getAdvancedRuntimeConfig(); } }