feat: [ops] enhance Map/MapReduce's dev ops

This commit is contained in:
tjq 2024-02-25 02:12:19 +08:00
parent 4046ea39b5
commit 37ef35bd80
20 changed files with 291 additions and 27 deletions

View File

@ -1,8 +1,8 @@
package tech.powerjob.common.model;
import tech.powerjob.common.PowerSerializable;
import lombok.Data;
import lombok.NoArgsConstructor;
import tech.powerjob.common.PowerSerializable;
import java.util.List;
@ -51,8 +51,15 @@ public class InstanceDetail implements PowerSerializable {
/**
* Task detail, used by MapReduce or Broadcast tasks.
* 命名有点问题实际是 task 统计信息
*/
private TaskDetail taskDetail;
/**
* 查询出来的 Task 详细结果
*/
private List<TaskDetailInfo> queriedTaskDetailInfoList;
/**
* Sub instance details, used by frequent tasks.
*/
@ -92,5 +99,14 @@ public class InstanceDetail implements PowerSerializable {
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
// 等待派发状态仅存在 TaskTracker 数据库中
protected Long waitingDispatchTaskNum;
// 已派发 ProcessorTracker 未确认可能由于网络错误请求未送达也有可能 ProcessorTracker 线程池满拒绝执行
protected Long workerUnreceivedTaskNum;
// ProcessorTracker确认接收存在与线程池队列中排队执行
protected Long receivedTaskNum;
// ProcessorTracker正在执行
protected Long runningTaskNum;
}
}

View File

@ -0,0 +1,39 @@
package tech.powerjob.common.model;
import lombok.Data;
import lombok.experimental.Accessors;
import tech.powerjob.common.PowerSerializable;
/**
* Task 详细信息
*
* @author tjq
* @since 2024/2/25
*/
@Data
@Accessors(chain = true)
public class TaskDetailInfo implements PowerSerializable {
private String taskId;
private String taskName;
/**
* 处理器地址
*/
private String processorAddress;
private Integer status;
private String statusStr;
private String result;
private Integer failedCnt;
/**
* 创建时间
*/
private Long createdTime;
/**
* 最后修改时间
*/
private Long lastModifiedTime;
/**
* ProcessorTracker 最后上报时间
*/
private Long lastReportTime;
}

View File

@ -17,4 +17,11 @@ import tech.powerjob.common.PowerSerializable;
public class ServerQueryInstanceStatusReq implements PowerSerializable {
private Long instanceId;
/**
* 自定义查询
* 针对高阶用户直接开放底库查询便于运维和排查问题
* 此处只传递查询条件前置拼接 select *后置拼接 limit
*/
private String customQuery;
}

View File

@ -265,7 +265,7 @@ public class InstanceService {
* @return 详细运行状态
*/
@DesignateServer
public InstanceDetail getInstanceDetail(Long appId, Long instanceId) {
public InstanceDetail getInstanceDetail(Long appId, Long instanceId, String customQuery) {
InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
@ -283,7 +283,7 @@ public class InstanceService {
Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) {
WorkerInfo workerInfo = workerInfoOpt.get();
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId);
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId, customQuery);
try {
final URL url = ServerURLFactory.queryInstance2Worker(workerInfo.getAddress());
AskResponse askResponse = transportService.ask(workerInfo.getProtocol(), url, req, AskResponse.class)

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.web.controller;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.common.utils.OmsFileUtils;
@ -10,6 +11,7 @@ import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository
import tech.powerjob.server.core.service.CacheService;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.web.request.QueryInstanceDetailRequest;
import tech.powerjob.server.web.request.QueryInstanceRequest;
import tech.powerjob.server.web.response.InstanceDetailVO;
import tech.powerjob.server.web.response.InstanceInfoVO;
@ -29,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -69,13 +72,28 @@ public class InstanceController {
@GetMapping("/detail")
public ResultDTO<InstanceDetailVO> getInstanceDetail(Long appId, Long instanceId) {
QueryInstanceDetailRequest queryInstanceDetailRequest = new QueryInstanceDetailRequest();
queryInstanceDetailRequest.setAppId(appId);
queryInstanceDetailRequest.setInstanceId(instanceId);
return getInstanceDetailPlus(queryInstanceDetailRequest);
}
// 兼容老版本前端不存在 appId 的场景
if (appId == null) {
appId = instanceService.getInstanceInfo(instanceId).getAppId();
@PostMapping("/detailPlus")
public ResultDTO<InstanceDetailVO> getInstanceDetailPlus(@RequestBody QueryInstanceDetailRequest req) {
// 非法请求参数校验
String customQuery = req.getCustomQuery();
String nonNullCustomQuery = Optional.ofNullable(customQuery).orElse(OmsConstant.NONE);
if (StringUtils.containsAnyIgnoreCase(nonNullCustomQuery, "delete", "update", "insert", "drop", "CREATE", "ALTER", "TRUNCATE", "RENAME", "LOCK", "GRANT", "REVOKE", "PREPARE", "EXECUTE", "COMMIT", "BEGIN")) {
throw new IllegalArgumentException("Don't get any ideas about the database, illegally query: " + customQuery);
}
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(appId, instanceId)));
// 兼容老版本前端不存在 appId 的场景
if (req.getAppId() == null) {
req.setAppId(instanceService.getInstanceInfo(req.getInstanceId()).getAppId());
}
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(req.getAppId(), req.getInstanceId(), customQuery)));
}
@GetMapping("/log")

View File

@ -0,0 +1,24 @@
package tech.powerjob.server.web.request;
import lombok.Data;
import java.io.Serializable;
/**
* 查询任务详情的请求
*
* @author tjq
* @since 2024/2/25
*/
@Data
public class QueryInstanceDetailRequest implements Serializable {
private Long appId;
private Long instanceId;
/**
* 自定义查询条件SQL
*/
private String customQuery;
}

View File

@ -1,15 +1,18 @@
package tech.powerjob.server.web.response;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.utils.CommonUtils;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 任务实例的运行详细信息对外展示对象
@ -58,7 +61,11 @@ public class InstanceDetailVO {
/**
* MR或BD任务专用
*/
private InstanceDetailVO.TaskDetail taskDetail;
private InstanceDetailVO.InstanceTaskStats instanceTaskStats;
/**
* 查询出来的 Task 详细结果
*/
private List<TaskDetailInfoVO> queriedTaskDetailInfoList;
/**
* 秒级任务专用
*/
@ -87,10 +94,19 @@ public class InstanceDetailVO {
*/
@Data
@NoArgsConstructor
public static class TaskDetail implements PowerSerializable {
public static class InstanceTaskStats implements PowerSerializable {
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
// 等待派发状态仅存在 TaskTracker 数据库中
protected Long waitingDispatchTaskNum;
// 已派发 ProcessorTracker 未确认可能由于网络错误请求未送达也有可能 ProcessorTracker 线程池满拒绝执行
protected Long workerUnreceivedTaskNum;
// ProcessorTracker确认接收存在与线程池队列中排队执行
protected Long receivedTaskNum;
// ProcessorTracker正在执行
protected Long runningTaskNum;
}
public static InstanceDetailVO from(InstanceDetail origin) {
@ -104,9 +120,9 @@ public class InstanceDetailVO {
// 拷贝 TaskDetail
if (origin.getTaskDetail() != null) {
TaskDetail voDetail = new TaskDetail();
InstanceTaskStats voDetail = new InstanceTaskStats();
BeanUtils.copyProperties(origin.getTaskDetail(), voDetail);
vo.setTaskDetail(voDetail);
vo.setInstanceTaskStats(voDetail);
}
// 拷贝秒级任务数据
@ -126,6 +142,10 @@ public class InstanceDetailVO {
});
}
// 拷贝 MR Task 结果
List<TaskDetailInfoVO> taskDetailInfoVOList = Optional.ofNullable(origin.getQueriedTaskDetailInfoList()).orElse(Collections.emptyList()).stream().map(TaskDetailInfoVO::from).collect(Collectors.toList());
vo.setQueriedTaskDetailInfoList(taskDetailInfoVOList);
return vo;
}
}

View File

@ -0,0 +1,58 @@
package tech.powerjob.server.web.response;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.beans.BeanUtils;
import tech.powerjob.common.model.TaskDetailInfo;
import tech.powerjob.common.utils.CommonUtils;
import java.io.Serializable;
/**
* 任务详情
*
* @author tjq
* @since 2024/2/25
*/
@Data
@Accessors(chain = true)
public class TaskDetailInfoVO implements Serializable {
private String taskId;
private String taskName;
/**
* 处理器地址
*/
private String processorAddress;
private Integer status;
private String statusStr;
private String result;
private Integer failedCnt;
/**
* 创建时间
*/
private Long createdTime;
private String createdTimeStr;
/**
* 最后修改时间
*/
private Long lastModifiedTime;
private String lastModifiedTimeStr;
/**
* ProcessorTracker 最后上报时间
*/
private Long lastReportTime;
private String lastReportTimeStr;
public static TaskDetailInfoVO from(TaskDetailInfo taskDetailInfo) {
TaskDetailInfoVO vo = new TaskDetailInfoVO();
BeanUtils.copyProperties(taskDetailInfo, vo);
vo.setCreatedTimeStr(CommonUtils.formatTime(vo.getCreatedTime()));
vo.setLastModifiedTimeStr(CommonUtils.formatTime(vo.getLastModifiedTime()));
vo.setLastReportTimeStr(CommonUtils.formatTime(vo.getLastReportTime()));
return vo;
}
}

View File

@ -202,7 +202,7 @@ public class TaskTrackerActor {
log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req);
askResponse = AskResponse.failed("can't find TaskTracker");
} else {
InstanceDetail instanceDetail = taskTracker.fetchRunningStatus();
InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(req);
askResponse = AskResponse.succeed(instanceDetail);
}
return askResponse;

View File

@ -6,6 +6,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.model.JobAdvancedRuntimeConfig;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.serialize.JsonUtils;
@ -112,7 +113,7 @@ public abstract class TaskTracker {
*
* @return 任务实例的详细运行状态
*/
public abstract InstanceDetail fetchRunningStatus();
public abstract InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req);
public static void reportCreateErrorToServer(ServerScheduleJobReq req, WorkerRuntime workerRuntime, Exception e) {

View File

@ -11,6 +11,8 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.model.TaskDetailInfo;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.utils.CollectionUtils;
@ -22,13 +24,16 @@ import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.persistence.SwapTaskPersistenceService;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.pojo.converter.TaskConverter;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 负责管理 JobInstance 的运行主要包括任务的派发MR可能存在大量的任务和状态的更新
@ -85,7 +90,7 @@ public class CommonTaskTracker extends HeavyTaskTracker {
}
@Override
public InstanceDetail fetchRunningStatus() {
public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) {
InstanceDetail detail = new InstanceDetail();
// 填充基础信息
@ -95,18 +100,30 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// 填充详细信息
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
taskDetail.setSucceedTaskNum(holder.succeedNum);
taskDetail.setFailedTaskNum(holder.failedNum);
taskDetail.setSucceedTaskNum(holder.getSucceedNum());
taskDetail.setFailedTaskNum(holder.getFailedNum());
taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
taskDetail.setWaitingDispatchTaskNum(holder.getWaitingDispatchNum());
taskDetail.setWorkerUnreceivedTaskNum(holder.getWorkerUnreceivedNum());
taskDetail.setReceivedTaskNum(holder.getReceivedNum());
taskDetail.setRunningTaskNum(holder.getRunningNum());
detail.setTaskDetail(taskDetail);
// 填充最近的任务结果
String customQuery = Optional.ofNullable(req.getCustomQuery()).orElse(" status in (5, 6) order by last_modified_time ");
customQuery = customQuery.concat(" limit 10");
List<TaskDO> queriedTaskDos = taskPersistenceService.getTaskByQuery(instanceId, customQuery);
List<TaskDetailInfo> taskDetailInfoList = Optional.ofNullable(queriedTaskDos).orElse(Collections.emptyList()).stream().map(TaskConverter::taskDo2TaskDetail).collect(Collectors.toList());
detail.setQueriedTaskDetailInfoList(taskDetailInfoList);
return detail;
}
/**
* 持久化根任务只有完成持久化才能视为任务开始running先持久化再报告server
*/

View File

@ -14,6 +14,7 @@ import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.serialize.JsonUtils;
@ -125,7 +126,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
}
@Override
public InstanceDetail fetchRunningStatus() {
public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) {
InstanceDetail detail = new InstanceDetail();
// 填充基础信息
detail.setActualTriggerTime(createTime);

View File

@ -7,6 +7,7 @@ import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.enhance.SafeRunnableWrapper;
@ -179,7 +180,7 @@ public class LightTaskTracker extends TaskTracker {
}
@Override
public InstanceDetail fetchRunningStatus() {
public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) {
InstanceDetail detail = new InstanceDetail();
// 填充基础信息
detail.setActualTriggerTime(createTime);

View File

@ -245,6 +245,19 @@ public class DbTaskPersistenceService implements TaskPersistenceService {
return Lists.newLinkedList();
}
@Override
public List<TaskDO> getTaskByQuery(Long instanceId, String customQuery) {
SimpleTaskQuery simpleTaskQuery = new SimpleTaskQuery();
simpleTaskQuery.setInstanceId(instanceId);
simpleTaskQuery.setFullCustomQueryCondition(customQuery);
try {
return execute(() -> taskDAO.simpleQuery(simpleTaskQuery), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByQuery cost {}ms", instanceId, cost));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskByQuery for instance(id={}) failed.", instanceId, e);
}
return Lists.newLinkedList();
}
/**
* 根据主键查询 Task
*/

View File

@ -102,6 +102,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
return dbTaskPersistenceService.getTaskByStatus(instanceId, status, limit);
}
@Override
public List<TaskDO> getTaskByQuery(Long instanceId, String customQuery) {
return dbTaskPersistenceService.getTaskByQuery(instanceId, customQuery);
}
@Override
public Optional<TaskDO> getTask(Long instanceId, String taskId) {
return dbTaskPersistenceService.getTask(instanceId, taskId);

View File

@ -67,7 +67,7 @@ public class TaskDO {
*/
private Long lastReportTime;
public String getUpdateSQL() {
public String fetchUpdateSQL() {
StringBuilder sb = new StringBuilder();
// address 有置空需求仅判断 NULL

View File

@ -33,6 +33,8 @@ public interface TaskPersistenceService {
List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit);
List<TaskDO> getTaskByQuery(Long instanceId, String customQuery);
Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId);
List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId);

View File

@ -28,17 +28,24 @@ public class SimpleTaskQuery {
private String address;
private Integer status;
// 查询内容默认为 *
private String queryContent = " * ";
// 自定义的查询条件where 后面的语句 crated_time > 10086 and status = 3
private String queryCondition;
// 自定义的查询条件 GROUP BY status
private String otherCondition;
// 查询内容默认为 *
private String queryContent = " * ";
private Integer limit;
/**
* 高级模式完全自定义查询 SQL
* 当传入此值时忽略 queryCondition + otherCondition + limit
*/
private String fullCustomQueryCondition;
public String getQueryCondition() {
StringBuilder sb = new StringBuilder();
if (!StringUtils.isEmpty(taskId)) {
sb.append("task_id = '").append(taskId).append("'").append(LINK);
@ -64,6 +71,12 @@ public class SimpleTaskQuery {
sb.append("status = ").append(status).append(LINK);
}
// 自定义查询模式专用
if (StringUtils.isNotEmpty(fullCustomQueryCondition)) {
sb.append(fullCustomQueryCondition);
return sb.toString();
}
if (!StringUtils.isEmpty(queryCondition)) {
sb.append(queryCondition).append(LINK);
}

View File

@ -148,7 +148,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override
public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException {
String sqlFormat = "update task_info set %s where %s";
String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition());
String updateSQL = String.format(sqlFormat, updateField.fetchUpdateSQL(), condition.getQueryCondition());
try (Connection conn = connectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) {
stat.executeUpdate();
return true;

View File

@ -0,0 +1,29 @@
package tech.powerjob.worker.pojo.converter;
import tech.powerjob.common.model.TaskDetailInfo;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.persistence.TaskDO;
/**
* 任务相关的对象转换
*
* @author tjq
* @since 2024/2/25
*/
public class TaskConverter {
public static TaskDetailInfo taskDo2TaskDetail(TaskDO taskDO) {
TaskDetailInfo taskDetailInfo = new TaskDetailInfo();
taskDetailInfo.setTaskId(taskDO.getTaskId())
.setTaskName(taskDO.getTaskName())
.setStatus(taskDO.getStatus())
.setStatusStr(TaskStatus.of(taskDetailInfo.getStatus()).name())
.setResult(taskDO.getResult())
.setFailedCnt(taskDO.getFailedCnt())
.setProcessorAddress(taskDO.getAddress())
.setCreatedTime(taskDO.getCreatedTime())
.setLastModifiedTime(taskDO.getLastModifiedTime())
.setLastReportTime(taskDO.getLastReportTime());
return taskDetailInfo;
}
}