feat: [ops] enhance Map/MapReduce's dev ops

This commit is contained in:
tjq 2024-02-25 12:23:58 +08:00
parent 37ef35bd80
commit 07e0e17ec0
10 changed files with 191 additions and 63 deletions

View File

@ -16,6 +16,10 @@ public class TaskDetailInfo implements PowerSerializable {
private String taskId;
private String taskName;
/**
* 任务对象map subTask
*/
private String taskContent;
/**
* 处理器地址
*/

View File

@ -12,6 +12,15 @@ import java.util.Map;
*/
public class MapUtils {
public static <K> Long getLong(Map<? super K, ?> map, K key, Long defaultValue) {
Long answer = getLong(map, key);
if (answer == null) {
answer = defaultValue;
}
return answer;
}
public static <K> long getLongValue(Map<? super K, ?> map, K key) {
Long longObject = getLong(map, key);
return longObject == null ? 0L : longObject;

View File

@ -20,6 +20,10 @@ public class TaskDetailInfoVO implements Serializable {
private String taskId;
private String taskName;
/**
* 任务对象map subTask
*/
private String taskContent;
/**
* 处理器地址
*/

View File

@ -1,21 +1,25 @@
package tech.powerjob.samples.processors;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.*;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.MapUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
/**
* MapReduce 处理器示例
@ -31,70 +35,156 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
// PowerJob 提供的日志 API可支持在控制台指定多种日志模式在线查看 / 本地打印最佳实践全部使用 OmsLogger 打印日志开发阶段控制台配置为 在线日志方便开发上线后调整为本地日志与直接使用 SLF4J 无异
OmsLogger omsLogger = context.getOmsLogger();
log.info("============== TestMapReduceProcessor#process ==============");
log.info("isRootTask:{}", isRootTask());
log.info("taskContext:{}", JsonUtils.toJSONString(context));
// 是否为根任务一般根任务进行任务的分发
boolean isRootTask = isRootTask();
// Task 名称除了 MAP 任务其他 taskName 均由开发者自己创建某种意义上也可以按参数理解比如多层 MAP 的情况下taskName 可以命名为Map_Level1, Map_Level2最终按 taskName 判断层级进不同的执行分支
String taskName = context.getTaskName();
// 任务参数控制台任务配置中直接填写的参数
String jobParamsStr = context.getJobParams();
// 任务示例参数运行任务时手动填写的参数等同于 OpenAPI runJob 的携带的参数
String instanceParamsStr = context.getInstanceParams();
// 根据控制台参数获取MR批次及子任务大小
final JSONObject jobParams = Optional.ofNullable(context.getJobParams()).map(JSONObject::parseObject).orElse(new JSONObject());
omsLogger.info("[MapReduceDemo] [startExecuteNewTask] jobId:{}, instanceId:{}, taskId:{}, taskName: {}, RetryTimes: {}, isRootTask:{}, jobParams:{}, instanceParams:{}", context.getJobId(), context.getInstanceId(), context.getTaskId(), taskName, context.getCurrentRetryTimes(), isRootTask, jobParamsStr, instanceParamsStr);
Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
// 常见写法优先从 InstanceParams 获取参数取不到再从 JobParams 中获取灵活性最佳相当于实现了实例参数重载任务参数
String finalParams = StringUtils.isEmpty(instanceParamsStr) ? jobParamsStr : instanceParamsStr;
final JSONObject params = Optional.ofNullable(finalParams).map(JSONObject::parseObject).orElse(new JSONObject());
if (isRootTask()) {
log.info("==== MAP ====");
omsLogger.info("[DemoMRProcessor] start root task~");
List<TestSubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
for (int i = 0; i < batchSize; i++) {
int x = j * batchSize + i;
subTasks.add(new TestSubTask("name" + x, x));
if (isRootTask) {
omsLogger.info("[MapReduceDemo] [RootTask] start execute root task~");
/*
* rootTask 内的核心逻辑即为按自己的业务需求拆分子任务比如
* - 从数据库/数仓拉一批任务出来做计算 MAP 任务就可以 stream 读全库 N ID 作为一个 SubTask 对外分发
* - 需要读取几千万个文件进行解析那么 MAP 任务就可以将 N 个文件名作为一个 SubTask 对外分发每个子任务接收到文件名称进行文件处理
*
* eg. 现在需要从文件中读取100W个ID并处理数据库中这些ID对应的数据那么步骤如下
* 1. 根任务RootTask读取文件流式拉取100W个ID并按100个一批的大小组装成子任务进行派发
* 2. 非根任务获取子任务完成业务逻辑的处理
*
* 以下 demo 进行该逻辑的模拟
*/
// 构造子任务
// 需要读取的文件总数
Long num = MapUtils.getLong(params, "num", 100000L);
// 每个子任务携带多少个文件ID此参数越大每个子任务就越大如果失败的重试成本就越高参数越小每个子任务就越轻当相应的分片数量会提升会让 PowerJob 计算开销增大建议按业务需求合理调配
Long batchSize = MapUtils.getLong(params, "batchSize", 100L);
// 此处模拟从文件读取 num ID每个子任务携带 batchSize ID 作为一个分片
List<Long> ids = Lists.newArrayList();
for (long i = 0; i < num; i++) {
ids.add(i);
if (ids.size() >= batchSize) {
// 构造自己的子任务自行传递所有需要的参数
SubTask subTask = new SubTask(ThreadLocalRandom.current().nextLong(), Lists.newArrayList(ids), "extra");
ids.clear();
try {
/*
第一个参数List<子任务>map 支持批量操作以减少网络 IO 提升性能简单起见此处不再示例开发者可自行优化性能
第二个参数子任务名称即后续 Task 执行时从 TaskContext#taskName 拿到的值某种意义上也可以按参数理解比如多层 MAP 的情况下taskName 可以命名为Map_Level1, Map_Level2最终按 taskName 判断层级进不同的执行分支
*/
map(Lists.newArrayList(subTask), "L1_FILE_PROCESS");
} catch (Exception e) {
// 注意 MAP 操作可能抛出异常建议进行捕获并按需处理
omsLogger.error("[MapReduceDemo] map task failed!", e);
throw e;
}
map(subTasks, "MAP_TEST_TASK");
subTasks.clear();
}
omsLogger.info("[DemoMRProcessor] map success~");
return new ProcessResult(true, "MAP_SUCCESS");
}
if (!ids.isEmpty()) {
map(Lists.newArrayList(new SubTask()), "L1_FILE_PROCESS");
}
// map 阶段的结果由于前置逻辑为异常直接抛出执行到这里一定成功所以无脑设置为 success开发者可自行调整逻辑
return new ProcessResult(true, "MAP_SUCCESS,totalNum:" + num);
}
// 如果是简单的二层结构ROOT - SubTASK此处一定是子 Task无需再次判断否则可使用 TaskContext#taskName 字符串匹配 TaskContext#SubTask 对象内自定义参数匹配进入目标执行分支
// 获取前置节点 map 传递过来的参数进行业务处理
SubTask subTask = (SubTask) context.getSubTask();
log.info("[MapReduceDemo] [SubTask] taskId:{}, taskName: {}, subTask: {}", context.getTaskId(), taskName, JsonUtils.toJSONString(subTask));
Thread.sleep(MapUtils.getLong(params, "bizProcessCost", 233L));
// 模拟有成功有失败的情况开发者按真实业务执行情况判断即可
long successRate = MapUtils.getLong(params, "successRate", 80L);
long randomNum = ThreadLocalRandom.current().nextLong(100);
if (successRate > randomNum) {
return new ProcessResult(true, "PROCESS_SUCCESS:" + randomNum);
} else {
log.info("==== NORMAL_PROCESS ====");
omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask()));
log.info("subTask: {}", JsonUtils.toJSONString(context.getSubTask()));
Thread.sleep(1000);
if (context.getCurrentRetryTimes() == 0) {
return new ProcessResult(false, "FIRST_FAILED");
} else {
return new ProcessResult(true, "PROCESS_SUCCESS");
}
return new ProcessResult(false, "PROCESS_FAILED:" + randomNum);
}
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
log.info("================ MapReduceProcessorDemo#reduce ================");
log.info("TaskContext: {}", JSONObject.toJSONString(context));
log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults));
context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
// 子任务结果太大上报在线日志会有 IO 问题直接使用本地日志打
log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults));
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("================ MapReduceProcessorDemo#reduce ================");
// 所有 Task 执行结束后reduce 将会被执行taskResults 保存了所有子任务的执行结果注意 reduce 由于保存了所有子任务的执行结果在子任务规模巨大时对内存有极大开销超大型计算任务慎用或使用流式 reduce开发中
// 用法举例统计执行结果
AtomicLong successCnt = new AtomicLong(0);
AtomicLong failedCnt = new AtomicLong(0);
taskResults.forEach(tr -> {
if (tr.isSuccess()) {
successCnt.incrementAndGet();
} else {
failedCnt.incrementAndGet();
}
});
double successRate = 1.0 * successCnt.get() / (successCnt.get() + failedCnt.get());
String resultMsg = String.format("succeedTaskNum:%d,failedTaskNum:%d,successRate:%f", successCnt.get(), failedCnt.get(), successRate);
omsLogger.info("[MapReduceDemo] [Reduce] {}", resultMsg);
// reduce 阶段的结果将作为任务真正执行结果
if (successRate > 0.8) {
return new ProcessResult(true, resultMsg);
} else {
return new ProcessResult(false, resultMsg);
}
}
@Getter
@Setter
@ToString
@AllArgsConstructor
public static class TestSubTask {
/**
* 自定义的子任务按自己的业务需求定义即可
* 注意代表子任务参数的类一定要有无参构造方法一定要有无参构造方法一定要有无参构造方法
* 最好把 GET / SET 方法也加上减少序列化问题的概率
*/
public TestSubTask() {
@Data
@AllArgsConstructor
private static class SubTask implements Serializable {
/**
* 再次强调一定要有无参构造方法
*/
public SubTask() {
}
private String name;
private int age;
private Long siteId;
private List<Long> idList;
private String extra;
}
}

View File

@ -16,17 +16,18 @@ import java.util.Set;
@AllArgsConstructor
public enum TaskStatus {
WAITING_DISPATCH(1, "等待调度器调度"),
DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功但不保证worker收到"),
WORKER_RECEIVED(3, "worker接收成功但未开始执行"),
WORKER_PROCESSING(4, "worker正在执行"),
WORKER_PROCESS_FAILED(5, "worker执行失败"),
WORKER_PROCESS_SUCCESS(6, "worker执行成功");
WAITING_DISPATCH(1, "等待调度器调度", "dispatching"),
DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功但不保证worker收到", "unreceived"),
WORKER_RECEIVED(3, "worker接收成功但未开始执行", "received"),
WORKER_PROCESSING(4, "worker正在执行", "running"),
WORKER_PROCESS_FAILED(5, "worker执行失败", "failed"),
WORKER_PROCESS_SUCCESS(6, "worker执行成功", "succeed");
public static final Set<Integer> FINISHED_STATUS = Sets.newHashSet(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value);
private final int value;
private final String des;
private final String simplyDesc;
public static TaskStatus of(int v) {
for (TaskStatus taskStatus : values()) {

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
@ -113,11 +114,12 @@ public class CommonTaskTracker extends HeavyTaskTracker {
detail.setTaskDetail(taskDetail);
// 填充最近的任务结果
String customQuery = Optional.ofNullable(req.getCustomQuery()).orElse(" status in (5, 6) order by last_modified_time ");
customQuery = customQuery.concat(" limit 10");
if (StringUtils.isNotEmpty(req.getCustomQuery())) {
String customQuery = req.getCustomQuery().concat(" limit 10");
List<TaskDO> queriedTaskDos = taskPersistenceService.getTaskByQuery(instanceId, customQuery);
List<TaskDetailInfo> taskDetailInfoList = Optional.ofNullable(queriedTaskDos).orElse(Collections.emptyList()).stream().map(TaskConverter::taskDo2TaskDetail).collect(Collectors.toList());
detail.setQueriedTaskDetailInfoList(taskDetailInfoList);
}
return detail;
}

View File

@ -250,6 +250,7 @@ public class DbTaskPersistenceService implements TaskPersistenceService {
SimpleTaskQuery simpleTaskQuery = new SimpleTaskQuery();
simpleTaskQuery.setInstanceId(instanceId);
simpleTaskQuery.setFullCustomQueryCondition(customQuery);
simpleTaskQuery.setReadOnly(true);
try {
return execute(() -> taskDAO.simpleQuery(simpleTaskQuery), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByQuery cost {}ms", instanceId, cost));
}catch (Exception e) {

View File

@ -44,6 +44,12 @@ public class SimpleTaskQuery {
*/
private String fullCustomQueryCondition;
/**
* 是否设置为只读模式
* 理论上全部查询均可设置不过出于最小改动原则仅针对新功能添加 readOnly
*/
private boolean readOnly = false;
public String getQueryCondition() {
StringBuilder sb = new StringBuilder();

View File

@ -99,6 +99,9 @@ public class TaskDAOImpl implements TaskDAO {
String sql = "select * from task_info where " + query.getQueryCondition();
List<TaskDO> result = Lists.newLinkedList();
try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
if (query.isReadOnly()) {
conn.setReadOnly(true);
}
rs = ps.executeQuery();
while (rs.next()) {
result.add(convert(rs));

View File

@ -1,6 +1,8 @@
package tech.powerjob.worker.pojo.converter;
import tech.powerjob.common.model.TaskDetailInfo;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.serialize.SerializerUtils;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.persistence.TaskDO;
@ -17,13 +19,19 @@ public class TaskConverter {
taskDetailInfo.setTaskId(taskDO.getTaskId())
.setTaskName(taskDO.getTaskName())
.setStatus(taskDO.getStatus())
.setStatusStr(TaskStatus.of(taskDetailInfo.getStatus()).name())
.setStatusStr(TaskStatus.of(taskDetailInfo.getStatus()).getSimplyDesc())
.setResult(taskDO.getResult())
.setFailedCnt(taskDO.getFailedCnt())
.setProcessorAddress(taskDO.getAddress())
.setCreatedTime(taskDO.getCreatedTime())
.setLastModifiedTime(taskDO.getLastModifiedTime())
.setLastReportTime(taskDO.getLastReportTime());
try {
taskDetailInfo.setTaskContent(JsonUtils.toJSONString(SerializerUtils.deSerialized(taskDO.getTaskContent())));
} catch (Exception ignore) {
}
return taskDetailInfo;
}
}