diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/InstanceDetail.java b/powerjob-common/src/main/java/tech/powerjob/common/model/InstanceDetail.java index f249783d..c728f7a2 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/InstanceDetail.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/InstanceDetail.java @@ -1,8 +1,8 @@ package tech.powerjob.common.model; -import tech.powerjob.common.PowerSerializable; import lombok.Data; import lombok.NoArgsConstructor; +import tech.powerjob.common.PowerSerializable; import java.util.List; @@ -51,8 +51,15 @@ public class InstanceDetail implements PowerSerializable { /** * Task detail, used by MapReduce or Broadcast tasks. + * 命名有点问题,实际是 task 统计信息 */ private TaskDetail taskDetail; + + /** + * 查询出来的 Task 详细结果 + */ + private List queriedTaskDetailInfoList; + /** * Sub instance details, used by frequent tasks. */ @@ -92,5 +99,14 @@ public class InstanceDetail implements PowerSerializable { private long totalTaskNum; private long succeedTaskNum; private long failedTaskNum; + + // 等待派发状态(仅存在 TaskTracker 数据库中) + protected Long waitingDispatchTaskNum; + // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行 + protected Long workerUnreceivedTaskNum; + // ProcessorTracker确认接收,存在与线程池队列中,排队执行 + protected Long receivedTaskNum; + // ProcessorTracker正在执行 + protected Long runningTaskNum; } } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/TaskDetailInfo.java b/powerjob-common/src/main/java/tech/powerjob/common/model/TaskDetailInfo.java new file mode 100644 index 00000000..9605c7a1 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/TaskDetailInfo.java @@ -0,0 +1,39 @@ +package tech.powerjob.common.model; + +import lombok.Data; +import lombok.experimental.Accessors; +import tech.powerjob.common.PowerSerializable; + +/** + * Task 详细信息 + * + * @author tjq + * @since 2024/2/25 + */ +@Data +@Accessors(chain = true) +public class TaskDetailInfo implements PowerSerializable { + + private String taskId; + private String taskName; + /** + * 处理器地址 + */ + private String processorAddress; + private Integer status; + private String statusStr; + private String result; + private Integer failedCnt; + /** + * 创建时间 + */ + private Long createdTime; + /** + * 最后修改时间 + */ + private Long lastModifiedTime; + /** + * ProcessorTracker 最后上报时间 + */ + private Long lastReportTime; +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerQueryInstanceStatusReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerQueryInstanceStatusReq.java index a37dfbf2..e692c623 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerQueryInstanceStatusReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerQueryInstanceStatusReq.java @@ -17,4 +17,11 @@ import tech.powerjob.common.PowerSerializable; public class ServerQueryInstanceStatusReq implements PowerSerializable { private Long instanceId; + /** + * 自定义查询 + * 针对高阶用户,直接开放底库查询,便于运维和排查问题 + * 此处只传递查询条件,前置拼接 select *,后置拼接 limit + */ + private String customQuery; + } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 29a0cfc2..2f09a052 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -265,7 +265,7 @@ public class InstanceService { * @return 详细运行状态 */ @DesignateServer - public InstanceDetail getInstanceDetail(Long appId, Long instanceId) { + public InstanceDetail getInstanceDetail(Long appId, Long instanceId, String customQuery) { InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId); @@ -283,7 +283,7 @@ public class InstanceService { Optional workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress()); if (workerInfoOpt.isPresent()) { WorkerInfo workerInfo = workerInfoOpt.get(); - ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId); + ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId, customQuery); try { final URL url = ServerURLFactory.queryInstance2Worker(workerInfo.getAddress()); AskResponse askResponse = transportService.ask(workerInfo.getProtocol(), url, req, AskResponse.class) diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java index 008d05e7..56ff5ea0 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java @@ -1,5 +1,6 @@ package tech.powerjob.server.web.controller; +import tech.powerjob.common.OmsConstant; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.server.common.utils.OmsFileUtils; @@ -10,6 +11,7 @@ import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository import tech.powerjob.server.core.service.CacheService; import tech.powerjob.server.core.instance.InstanceLogService; import tech.powerjob.server.core.instance.InstanceService; +import tech.powerjob.server.web.request.QueryInstanceDetailRequest; import tech.powerjob.server.web.request.QueryInstanceRequest; import tech.powerjob.server.web.response.InstanceDetailVO; import tech.powerjob.server.web.response.InstanceInfoVO; @@ -29,6 +31,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.File; import java.net.URL; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; @@ -69,13 +72,28 @@ public class InstanceController { @GetMapping("/detail") public ResultDTO getInstanceDetail(Long appId, Long instanceId) { + QueryInstanceDetailRequest queryInstanceDetailRequest = new QueryInstanceDetailRequest(); + queryInstanceDetailRequest.setAppId(appId); + queryInstanceDetailRequest.setInstanceId(instanceId); + return getInstanceDetailPlus(queryInstanceDetailRequest); + } - // 兼容老版本前端不存在 appId 的场景 - if (appId == null) { - appId = instanceService.getInstanceInfo(instanceId).getAppId(); + @PostMapping("/detailPlus") + public ResultDTO getInstanceDetailPlus(@RequestBody QueryInstanceDetailRequest req) { + + // 非法请求参数校验 + String customQuery = req.getCustomQuery(); + String nonNullCustomQuery = Optional.ofNullable(customQuery).orElse(OmsConstant.NONE); + if (StringUtils.containsAnyIgnoreCase(nonNullCustomQuery, "delete", "update", "insert", "drop", "CREATE", "ALTER", "TRUNCATE", "RENAME", "LOCK", "GRANT", "REVOKE", "PREPARE", "EXECUTE", "COMMIT", "BEGIN")) { + throw new IllegalArgumentException("Don't get any ideas about the database, illegally query: " + customQuery); } - return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(appId, instanceId))); + // 兼容老版本前端不存在 appId 的场景 + if (req.getAppId() == null) { + req.setAppId(instanceService.getInstanceInfo(req.getInstanceId()).getAppId()); + } + + return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(req.getAppId(), req.getInstanceId(), customQuery))); } @GetMapping("/log") diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceDetailRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceDetailRequest.java new file mode 100644 index 00000000..085a728e --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceDetailRequest.java @@ -0,0 +1,24 @@ +package tech.powerjob.server.web.request; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 查询任务详情的请求 + * + * @author tjq + * @since 2024/2/25 + */ +@Data +public class QueryInstanceDetailRequest implements Serializable { + + private Long appId; + + private Long instanceId; + + /** + * 自定义查询条件(SQL) + */ + private String customQuery; +} diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceDetailVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceDetailVO.java index 704bc960..35667de4 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceDetailVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceDetailVO.java @@ -1,15 +1,18 @@ package tech.powerjob.server.web.response; -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.model.InstanceDetail; -import tech.powerjob.common.utils.CommonUtils; import com.google.common.collect.Lists; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.model.InstanceDetail; +import tech.powerjob.common.utils.CommonUtils; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; /** * 任务实例的运行详细信息(对外展示对象) @@ -58,7 +61,11 @@ public class InstanceDetailVO { /** * MR或BD任务专用 */ - private InstanceDetailVO.TaskDetail taskDetail; + private InstanceDetailVO.InstanceTaskStats instanceTaskStats; + /** + * 查询出来的 Task 详细结果 + */ + private List queriedTaskDetailInfoList; /** * 秒级任务专用 */ @@ -87,10 +94,19 @@ public class InstanceDetailVO { */ @Data @NoArgsConstructor - public static class TaskDetail implements PowerSerializable { + public static class InstanceTaskStats implements PowerSerializable { private long totalTaskNum; private long succeedTaskNum; private long failedTaskNum; + + // 等待派发状态(仅存在 TaskTracker 数据库中) + protected Long waitingDispatchTaskNum; + // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行 + protected Long workerUnreceivedTaskNum; + // ProcessorTracker确认接收,存在与线程池队列中,排队执行 + protected Long receivedTaskNum; + // ProcessorTracker正在执行 + protected Long runningTaskNum; } public static InstanceDetailVO from(InstanceDetail origin) { @@ -104,9 +120,9 @@ public class InstanceDetailVO { // 拷贝 TaskDetail if (origin.getTaskDetail() != null) { - TaskDetail voDetail = new TaskDetail(); + InstanceTaskStats voDetail = new InstanceTaskStats(); BeanUtils.copyProperties(origin.getTaskDetail(), voDetail); - vo.setTaskDetail(voDetail); + vo.setInstanceTaskStats(voDetail); } // 拷贝秒级任务数据 @@ -126,6 +142,10 @@ public class InstanceDetailVO { }); } + // 拷贝 MR Task 结果 + List taskDetailInfoVOList = Optional.ofNullable(origin.getQueriedTaskDetailInfoList()).orElse(Collections.emptyList()).stream().map(TaskDetailInfoVO::from).collect(Collectors.toList()); + vo.setQueriedTaskDetailInfoList(taskDetailInfoVOList); + return vo; } } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/TaskDetailInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/TaskDetailInfoVO.java new file mode 100644 index 00000000..75fd5317 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/TaskDetailInfoVO.java @@ -0,0 +1,58 @@ +package tech.powerjob.server.web.response; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.springframework.beans.BeanUtils; +import tech.powerjob.common.model.TaskDetailInfo; +import tech.powerjob.common.utils.CommonUtils; + +import java.io.Serializable; + +/** + * 任务详情 + * + * @author tjq + * @since 2024/2/25 + */ +@Data +@Accessors(chain = true) +public class TaskDetailInfoVO implements Serializable { + + private String taskId; + private String taskName; + /** + * 处理器地址 + */ + private String processorAddress; + private Integer status; + private String statusStr; + + private String result; + private Integer failedCnt; + /** + * 创建时间 + */ + private Long createdTime; + private String createdTimeStr; + /** + * 最后修改时间 + */ + private Long lastModifiedTime; + private String lastModifiedTimeStr; + /** + * ProcessorTracker 最后上报时间 + */ + private Long lastReportTime; + private String lastReportTimeStr; + + public static TaskDetailInfoVO from(TaskDetailInfo taskDetailInfo) { + TaskDetailInfoVO vo = new TaskDetailInfoVO(); + BeanUtils.copyProperties(taskDetailInfo, vo); + + vo.setCreatedTimeStr(CommonUtils.formatTime(vo.getCreatedTime())); + vo.setLastModifiedTimeStr(CommonUtils.formatTime(vo.getLastModifiedTime())); + vo.setLastReportTimeStr(CommonUtils.formatTime(vo.getLastReportTime())); + + return vo; + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java index 917f3abb..d98da634 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java @@ -202,7 +202,7 @@ public class TaskTrackerActor { log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req); askResponse = AskResponse.failed("can't find TaskTracker"); } else { - InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(); + InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(req); askResponse = AskResponse.succeed(instanceDetail); } return askResponse; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index 2836d205..ad175ce7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -6,6 +6,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.model.JobAdvancedRuntimeConfig; +import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.serialize.JsonUtils; @@ -112,7 +113,7 @@ public abstract class TaskTracker { * * @return 任务实例的详细运行状态 */ - public abstract InstanceDetail fetchRunningStatus(); + public abstract InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req); public static void reportCreateErrorToServer(ServerScheduleJobReq req, WorkerRuntime workerRuntime, Exception e) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index bb354e7a..1035abff 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -11,6 +11,8 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.InstanceDetail; +import tech.powerjob.common.model.TaskDetailInfo; +import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.utils.CollectionUtils; @@ -22,13 +24,16 @@ import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.persistence.SwapTaskPersistenceService; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.pojo.converter.TaskConverter; import tech.powerjob.worker.pojo.model.InstanceInfo; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 @@ -85,7 +90,7 @@ public class CommonTaskTracker extends HeavyTaskTracker { } @Override - public InstanceDetail fetchRunningStatus() { + public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) { InstanceDetail detail = new InstanceDetail(); // 填充基础信息 @@ -95,18 +100,30 @@ public class CommonTaskTracker extends HeavyTaskTracker { // 填充详细信息 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); + InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail(); - taskDetail.setSucceedTaskNum(holder.succeedNum); - taskDetail.setFailedTaskNum(holder.failedNum); + taskDetail.setSucceedTaskNum(holder.getSucceedNum()); + taskDetail.setFailedTaskNum(holder.getFailedNum()); taskDetail.setTotalTaskNum(holder.getTotalTaskNum()); + taskDetail.setWaitingDispatchTaskNum(holder.getWaitingDispatchNum()); + taskDetail.setWorkerUnreceivedTaskNum(holder.getWorkerUnreceivedNum()); + taskDetail.setReceivedTaskNum(holder.getReceivedNum()); + taskDetail.setRunningTaskNum(holder.getRunningNum()); + detail.setTaskDetail(taskDetail); + // 填充最近的任务结果 + String customQuery = Optional.ofNullable(req.getCustomQuery()).orElse(" status in (5, 6) order by last_modified_time "); + customQuery = customQuery.concat(" limit 10"); + List queriedTaskDos = taskPersistenceService.getTaskByQuery(instanceId, customQuery); + List taskDetailInfoList = Optional.ofNullable(queriedTaskDos).orElse(Collections.emptyList()).stream().map(TaskConverter::taskDo2TaskDetail).collect(Collectors.toList()); + detail.setQueriedTaskDetailInfoList(taskDetailInfoList); + return detail; } - /** * 持久化根任务,只有完成持久化才能视为任务开始running(先持久化,再报告server) */ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java index 9f706963..716a1e8e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java @@ -14,6 +14,7 @@ import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.AlarmConfig; import tech.powerjob.common.model.InstanceDetail; +import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.serialize.JsonUtils; @@ -125,7 +126,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker { } @Override - public InstanceDetail fetchRunningStatus() { + public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) { InstanceDetail detail = new InstanceDetail(); // 填充基础信息 detail.setActualTriggerTime(createTime); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 79b58fbf..ecdc289e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -7,6 +7,7 @@ import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; +import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.enhance.SafeRunnableWrapper; @@ -179,7 +180,7 @@ public class LightTaskTracker extends TaskTracker { } @Override - public InstanceDetail fetchRunningStatus() { + public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) { InstanceDetail detail = new InstanceDetail(); // 填充基础信息 detail.setActualTriggerTime(createTime); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java index 307a4836..2803f58e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java @@ -245,6 +245,19 @@ public class DbTaskPersistenceService implements TaskPersistenceService { return Lists.newLinkedList(); } + @Override + public List getTaskByQuery(Long instanceId, String customQuery) { + SimpleTaskQuery simpleTaskQuery = new SimpleTaskQuery(); + simpleTaskQuery.setInstanceId(instanceId); + simpleTaskQuery.setFullCustomQueryCondition(customQuery); + try { + return execute(() -> taskDAO.simpleQuery(simpleTaskQuery), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByQuery cost {}ms", instanceId, cost)); + }catch (Exception e) { + log.error("[TaskPersistenceService] getTaskByQuery for instance(id={}) failed.", instanceId, e); + } + return Lists.newLinkedList(); + } + /** * 根据主键查询 Task */ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java index c6571bbe..13e6a6d3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java @@ -102,6 +102,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { return dbTaskPersistenceService.getTaskByStatus(instanceId, status, limit); } + @Override + public List getTaskByQuery(Long instanceId, String customQuery) { + return dbTaskPersistenceService.getTaskByQuery(instanceId, customQuery); + } + @Override public Optional getTask(Long instanceId, String taskId) { return dbTaskPersistenceService.getTask(instanceId, taskId); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java index 7d815148..ca9909e3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java @@ -67,7 +67,7 @@ public class TaskDO { */ private Long lastReportTime; - public String getUpdateSQL() { + public String fetchUpdateSQL() { StringBuilder sb = new StringBuilder(); // address 有置空需求,仅判断 NULL diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java index 2c53575c..589be927 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java @@ -33,6 +33,8 @@ public interface TaskPersistenceService { List getTaskByStatus(Long instanceId, TaskStatus status, int limit); + List getTaskByQuery(Long instanceId, String customQuery); + Map getTaskStatusStatistics(Long instanceId, Long subInstanceId); List getAllTaskResult(Long instanceId, Long subInstanceId); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java index 7372e9a3..efa5aa75 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java @@ -28,17 +28,24 @@ public class SimpleTaskQuery { private String address; private Integer status; + // 查询内容,默认为 * + private String queryContent = " * "; + // 自定义的查询条件(where 后面的语句),如 crated_time > 10086 and status = 3 private String queryCondition; // 自定义的查询条件,如 GROUP BY status private String otherCondition; - // 查询内容,默认为 * - private String queryContent = " * "; - private Integer limit; + /** + * 高级模式,完全自定义查询 SQL + * 当传入此值时忽略 queryCondition + otherCondition + limit + */ + private String fullCustomQueryCondition; + public String getQueryCondition() { + StringBuilder sb = new StringBuilder(); if (!StringUtils.isEmpty(taskId)) { sb.append("task_id = '").append(taskId).append("'").append(LINK); @@ -64,6 +71,12 @@ public class SimpleTaskQuery { sb.append("status = ").append(status).append(LINK); } + // 自定义查询模式专用 + if (StringUtils.isNotEmpty(fullCustomQueryCondition)) { + sb.append(fullCustomQueryCondition); + return sb.toString(); + } + if (!StringUtils.isEmpty(queryCondition)) { sb.append(queryCondition).append(LINK); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java index 3159d64f..4cb522d7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java @@ -148,7 +148,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException { String sqlFormat = "update task_info set %s where %s"; - String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); + String updateSQL = String.format(sqlFormat, updateField.fetchUpdateSQL(), condition.getQueryCondition()); try (Connection conn = connectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { stat.executeUpdate(); return true; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/converter/TaskConverter.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/converter/TaskConverter.java new file mode 100644 index 00000000..1139c1ff --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/converter/TaskConverter.java @@ -0,0 +1,29 @@ +package tech.powerjob.worker.pojo.converter; + +import tech.powerjob.common.model.TaskDetailInfo; +import tech.powerjob.worker.common.constants.TaskStatus; +import tech.powerjob.worker.persistence.TaskDO; + +/** + * 任务相关的对象转换 + * + * @author tjq + * @since 2024/2/25 + */ +public class TaskConverter { + + public static TaskDetailInfo taskDo2TaskDetail(TaskDO taskDO) { + TaskDetailInfo taskDetailInfo = new TaskDetailInfo(); + taskDetailInfo.setTaskId(taskDO.getTaskId()) + .setTaskName(taskDO.getTaskName()) + .setStatus(taskDO.getStatus()) + .setStatusStr(TaskStatus.of(taskDetailInfo.getStatus()).name()) + .setResult(taskDO.getResult()) + .setFailedCnt(taskDO.getFailedCnt()) + .setProcessorAddress(taskDO.getAddress()) + .setCreatedTime(taskDO.getCreatedTime()) + .setLastModifiedTime(taskDO.getLastModifiedTime()) + .setLastReportTime(taskDO.getLastReportTime()); + return taskDetailInfo; + } +}