From 3b331e70cca3e326514ff9148041e00d0624bdce Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 10 Apr 2020 13:25:00 +0800 Subject: [PATCH] remove taskTimeLimit because I can't stop the running thread. --- .../github/kfcfans/common/RemoteConstant.java | 3 + .../request/ServerQueryInstanceStatusReq.java | 16 +++++ .../common/request/ServerScheduleJobReq.java | 2 - .../TaskTrackerReportInstanceStatusReq.java | 1 + .../oms/server/akka/actors/FriendActor.java | 38 +++++++++++- .../RedirectServerQueryInstanceStatusReq.java | 17 ++++++ .../persistence/model/ExecuteLogDO.java | 2 + .../server/persistence/model/JobInfoDO.java | 2 - .../oms/server/service/DispatchService.java | 1 - .../service/instance/InstanceManager.java | 9 +++ .../instance/InstanceStatusHolder.java | 2 + .../timing/schedule/JobScheduleService.java | 1 + .../web/controller/InstanceController.java | 61 ++++++++++++------- .../server/web/controller/JobController.java | 1 + .../oms/worker/actors/TaskTrackerActor.java | 28 ++++++++- .../tracker/processor/ProcessorTracker.java | 31 +++++----- .../core/tracker/task/CommonTaskTracker.java | 59 ++++++++++-------- .../tracker/task/FrequentTaskTracker.java | 2 +- .../worker/core/tracker/task/TaskTracker.java | 3 +- .../oms/worker/pojo/model/InstanceInfo.java | 2 - 20 files changed, 207 insertions(+), 74 deletions(-) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java index 76a317d9..f246f5fb 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java @@ -1,5 +1,7 @@ package com.github.kfcfans.common; +import java.time.Duration; + /** * RemoteConstant * @@ -30,4 +32,5 @@ public class RemoteConstant { /* ************************ OTHERS ************************ */ public static final String EMPTY_ADDRESS = "N/A"; + public static final long DEFAULT_TIMEOUT_MS = 5000; } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java new file mode 100644 index 00000000..7d0e10d9 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java @@ -0,0 +1,16 @@ +package com.github.kfcfans.common.request; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 服务器查询实例运行状态,需要返回详细的运行数据 + * + * @author tjq + * @since 2020/4/10 + */ +@Data +public class ServerQueryInstanceStatusReq implements Serializable { + private Long instanceId; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index 4c41068d..04532c5e 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -41,8 +41,6 @@ public class ServerScheduleJobReq implements Serializable { */ // 整个任务的总体超时时间 private long instanceTimeoutMS; - // Task的超时时间 - private long taskTimeoutMS; /** * 任务运行参数 diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java index c19d4ecc..6babdbe3 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java @@ -25,6 +25,7 @@ public class TaskTrackerReportInstanceStatusReq implements Serializable { private long succeedTaskNum; private long failedTaskNum; + private long startTime; private long reportTime; private String sourceAddress; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java index 9ea752d4..5ddc9f4c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java @@ -1,17 +1,26 @@ package com.github.kfcfans.oms.server.akka.actors; import akka.actor.AbstractActor; +import akka.pattern.Patterns; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.requests.Ping; +import com.github.kfcfans.oms.server.akka.requests.RedirectServerQueryInstanceStatusReq; import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import java.io.Serializable; +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static com.github.kfcfans.common.RemoteConstant.DEFAULT_TIMEOUT_MS; + /** * 处理朋友们的信息(处理服务器与服务器之间的通讯) * @@ -25,13 +34,12 @@ public class FriendActor extends AbstractActor { return receiveBuilder() .match(Ping.class, this::onReceivePing) .match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq) - .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) + .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .build(); } /** * 处理存活检测的请求 - * @param ping 存活检测请求 */ private void onReceivePing(Ping ping) { AskResponse askResponse = new AskResponse(); @@ -42,7 +50,6 @@ public class FriendActor extends AbstractActor { /** * 处理停止任务实例的请求 - * @param req 停止运行任务实例 */ private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) { @@ -58,4 +65,29 @@ public class FriendActor extends AbstractActor { // 空,可能刚经历 Server 变更 或 TaskTracker 宕机。先忽略吧,打条日志压压惊 log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId); } + + /** + * 处理Server查询任务实例运行情况的请求 + */ + private void onReceiveRedirectServerQueryInstanceStatusReq(RedirectServerQueryInstanceStatusReq req) { + + Long instanceId = req.getReq().getInstanceId(); + String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId); + AskResponse response = new AskResponse(); + if (StringUtils.isEmpty(taskTrackerAddress)) { + response.setSuccess(false); + response.setExtra("can't find TaskTracker"); + log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}).", instanceId); + }else { + try { + CompletionStage ask = Patterns.ask(OhMyServer.getTaskTrackerActor(taskTrackerAddress), req.getReq(), Duration.ofMillis(DEFAULT_TIMEOUT_MS)); + response = (AskResponse) ask.toCompletableFuture().get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + }catch (Exception e) { + log.warn("[FriendActor] Ask TaskTracker for instance(instanceId={}) status failed.", instanceId, e); + response.setSuccess(false); + response.setExtra(e.getMessage()); + } + } + getSender().tell(response, getSelf()); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java new file mode 100644 index 00000000..d883e13f --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java @@ -0,0 +1,17 @@ +package com.github.kfcfans.oms.server.akka.requests; + +import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq; +import lombok.Data; + +import java.io.Serializable; + +/** + * 重定向 ServerQueryInstanceStatusReq + * + * @author tjq + * @since 2020/4/10 + */ +@Data +public class RedirectServerQueryInstanceStatusReq implements Serializable { + private ServerQueryInstanceStatusReq req; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java index 8bc94bac..69f38a06 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java @@ -22,6 +22,8 @@ public class ExecuteLogDO { // 任务ID private Long jobId; + // 任务所属应用的ID,冗余提高查询效率 + private Long appId; // 任务实例ID private Long instanceId; /** diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java index 5297441f..c5de2c27 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java @@ -53,8 +53,6 @@ public class JobInfoDO { private Integer concurrency; // 任务整体超时时间 private Long instanceTimeLimit; - // 任务的每一个Task超时时间 - private Long taskTimeLimit; /* ************************** 重试配置 ************************** */ private Integer instanceRetryNum; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index 857602e9..7aa4a073 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -81,7 +81,6 @@ public class DispatchService { req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name()); req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit()); - req.setTaskTimeoutMS(jobInfo.getTaskTimeLimit()); req.setThreadConcurrency(jobInfo.getConcurrency()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index d12abb43..f4f38bd3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -138,6 +138,15 @@ public class InstanceManager { return statusHolder.getSourceAddress(); } + /** + * 获取任务的详细运行信息,包括当前运行状态、任务数量、TaskTracker地址等 + * @param instanceId 任务实例ID + * @return 任务实例详细运行信息 + */ + public static InstanceStatusHolder getInstanceDetail(Long instanceId) { + return instanceId2StatusHolder.get(instanceId); + } + private static ExecuteLogRepository getExecuteLogRepository() { while (executeLogRepository == null) { try { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java index cd3e42cc..bc780072 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java @@ -20,6 +20,8 @@ public class InstanceStatusHolder { private long succeedTaskNum; private long failedTaskNum; + // 任务开始时间 + private long startTime; // 上次上报时间 private long lastReportTime; // 源地址(TaskTracker 地址) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 30503ee3..1261ad94 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -109,6 +109,7 @@ public class JobScheduleService { ExecuteLogDO executeLog = new ExecuteLogDO(); executeLog.setJobId(jobInfoDO.getId()); + executeLog.setAppId(jobInfoDO.getAppId()); executeLog.setInstanceId(IdGenerateService.allocate()); executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); executeLog.setExpectedTriggerTime(jobInfoDO.getNextTriggerTime()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index 69472c24..f1b43953 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -1,23 +1,26 @@ package com.github.kfcfans.oms.server.web.controller; import akka.actor.ActorSelection; +import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.request.ServerStopInstanceReq; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; -import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; -import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; -import com.github.kfcfans.oms.server.service.ha.ServerSelectService; +import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.Date; + +import static com.github.kfcfans.common.InstanceStatus.*; + /** * 任务实例 Controller * @@ -31,35 +34,49 @@ public class InstanceController { @Resource private ExecuteLogRepository executeLogRepository; @Resource - private JobInfoRepository jobInfoRepository; - @Resource private AppInfoRepository appInfoRepository; @GetMapping("/stop") - public ResultDTO stopInstance(Long instanceId) throws Exception { + public ResultDTO stopInstance(Long instanceId) { + + ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); + if (executeLogDO == null) { + return ResultDTO.failed("invalid instanceId: " + instanceId); + } + // 更新数据库,将状态置为停止 + executeLogDO.setStatus(STOPPED.getV()); + executeLogDO.setGmtModified(new Date()); + executeLogDO.setFinishedTime(System.currentTimeMillis()); + executeLogDO.setResult("STOPPED_BY_USER"); + executeLogRepository.saveAndFlush(executeLogDO); + + // 获取Server地址,准备转发请求 + AppInfoDO appInfoDO = appInfoRepository.findById(executeLogDO.getAppId()).orElse(new AppInfoDO()); + if (StringUtils.isEmpty(appInfoDO.getCurrentServer())) { + return ResultDTO.failed("can't find server"); + } + + // 将请求转发给目标Server(HTTP -> AKKA) + ActorSelection serverActor = OhMyServer.getServerActor(appInfoDO.getCurrentServer()); + RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq(); + req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId)); + serverActor.tell(req, null); + return ResultDTO.success(null); + } + + @GetMapping("/status") + public ResultDTO getRunningStatus(Long instanceId) { - // 联级查询:instanceId -> jobId -> appId -> serverAddress ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); if (executeLogDO == null) { return ResultDTO.failed("invalid instanceId: " + instanceId); } - JobInfoDO jobInfoDO = jobInfoRepository.findById(executeLogDO.getJobId()).orElseThrow(() -> { - throw new RuntimeException("impossible"); - }); + InstanceStatus status = InstanceStatus.of(executeLogDO.getStatus()); + if (status == FAILED || status == SUCCEED || status == STOPPED) { - AppInfoDO appInfoDO = appInfoRepository.findById(jobInfoDO.getAppId()).orElseThrow(() -> { - throw new RuntimeException("impossible"); - }); + } - String serverAddress = appInfoDO.getCurrentServer(); - - // 将请求转发给目标Server(HTTP -> AKKA) - ActorSelection serverActor = OhMyServer.getServerActor(serverAddress); - RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq(); - req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId)); - serverActor.tell(req, null); - - return ResultDTO.success(null); + return null; } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index e9a113e7..34f8bbd5 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -71,6 +71,7 @@ public class JobController { ExecuteLogDO executeLog = new ExecuteLogDO(); executeLog.setJobId(jobInfoDO.getId()); + executeLog.setAppId(jobInfoDO.getAppId()); executeLog.setInstanceId(IdGenerateService.allocate()); executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); executeLog.setExpectedTriggerTime(System.currentTimeMillis()); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index a5bdc954..e0df1ce8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -1,8 +1,9 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.common.request.ServerScheduleJobReq; -import com.github.kfcfans.oms.worker.core.tracker.task.CommonTaskTracker; +import com.github.kfcfans.common.request.ServerStopInstanceReq; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.oms.worker.persistence.TaskDO; @@ -33,6 +34,8 @@ public class TaskTrackerActor extends AbstractActor { .match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest) .match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq) .match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq) + .match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq) + .match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq) .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) .build(); } @@ -129,4 +132,27 @@ public class TaskTrackerActor extends AbstractActor { } taskTracker.receiveProcessorTrackerHeartbeat(req); } + + /** + * 停止任务实例 + */ + private void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) { + TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); + if (taskTracker == null) { + log.warn("[TaskTrackerActor] receive ServerStopInstanceReq({}) but system can't find TaskTracker.", req); + return; + } + taskTracker.destroy(); + } + + /** + * 查询任务实例运行状态 + */ + private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { + TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); + if (taskTracker == null) { + log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req); + return; + } + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 1be45e73..84d6fadc 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -104,22 +104,12 @@ public class ProcessorTracker { } } - /** - * 任务是否超时 - */ - public boolean isTimeout() { - return System.currentTimeMillis() - startTime > instanceInfo.getInstanceTimeoutMS(); - } - /** * 释放资源 */ public void destroy() { - // 1. 关闭定时线程池 - CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow()); - - // 2. 关闭执行执行线程池 + // 1. 关闭执行执行线程池 CommonUtils.executeIgnoreException(() -> { List tasks = threadPool.shutdownNow(); if (!CollectionUtils.isEmpty(tasks)) { @@ -128,11 +118,14 @@ public class ProcessorTracker { return null; }); - // 3. 去除顶层引用,送入GC世界 + // 2. 去除顶层引用,送入GC世界 taskTrackerActorRef = null; ProcessorTrackerPool.removeProcessorTracker(instanceId); - log.info("[ProcessorTracker-{}] mission complete, ProcessorTracker already destroyed!", instanceId); + log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId); + + // 3. 关闭定时线程池 + CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow()); } @@ -162,22 +155,30 @@ public class ProcessorTracker { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build(); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); - timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 10, TimeUnit.SECONDS); + timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS); } /** * 定时向 TaskTracker 汇报(携带任务执行信息的心跳) */ - private class TimingStatusReportRunnable implements Runnable { + private class CheckerAndReporter implements Runnable { @Override public void run() { + long interval = System.currentTimeMillis() - startTime; + if (interval > instanceInfo.getInstanceTimeoutMS()) { + log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId); + destroy(); + return; + } + long waitingNum = threadPool.getQueue().size(); ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum); taskTrackerActorRef.tell(req, null); } + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index 05884bed..0c1e0922 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -112,32 +112,36 @@ public class CommonTaskTracker extends TaskTracker { req.setSucceedTaskNum(holder.succeedNum); req.setFailedTaskNum(holder.failedNum); req.setReportTime(System.currentTimeMillis()); + req.setStartTime(createTime); req.setSourceAddress(OhMyWorker.getWorkerAddress()); + boolean success = false; + String result = null; // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果 - TaskDO resultTask = null; if (unfinishedNum == 0) { - boolean finishedBoolean = true; - // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败 if (finishedNum == 0) { - resultTask = new TaskDO(); - resultTask.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()); - resultTask.setResult("CREATE_ROOT_TASK_FAILED"); - + finished.set(true); + success = false; + result = "CREATE_ROOT_TASK_FAILED"; }else { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); // STANDALONE 只有一个任务,完成即结束 if (executeType == ExecuteType.STANDALONE) { + finished.set(true); + List allTask = taskPersistenceService.getAllTask(instanceId, instanceId); if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { + success = false; + result = "UNKNOWN BUG"; log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); }else { - resultTask = allTask.get(0); + result = allTask.get(0).getResult(); + success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); } } else { @@ -147,14 +151,18 @@ public class CommonTaskTracker extends TaskTracker { if (lastTaskOptional.isPresent()) { // 存在则根据 reduce 任务来判断状态 - resultTask = lastTaskOptional.get(); + TaskDO resultTask = lastTaskOptional.get(); TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus()); - finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED; + + if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { + finished.set(true); + success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS; + result = resultTask.getResult(); + } + }else { // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行! - finishedBoolean = false; - TaskDO newLastTask = new TaskDO(); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskId(LAST_TASK_ID); @@ -164,19 +172,22 @@ public class CommonTaskTracker extends TaskTracker { } } } + } - - finished.set(finishedBoolean); + // 3. 检查任务实例整体是否超时 + if (isTimeout()) { + finished.set(true); + success = false; + result = "TIMEOUT"; } String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); - // 3. 执行完毕,报告服务器(第二个判断则是为了取消烦人的编译器警告) - if (finished.get() && resultTask != null) { + // 4. 执行完毕,报告服务器 + if (finished.get()) { - boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); - req.setResult(resultTask.getResult()); + req.setResult(result); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS)); @@ -186,7 +197,7 @@ public class CommonTaskTracker extends TaskTracker { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS); serverAccepted = askResponse.isSuccess(); }catch (Exception e) { - log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, resultTask.getResult()); + log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result); } // 服务器未接受上报,则等待下次重新上报 @@ -196,17 +207,17 @@ public class CommonTaskTracker extends TaskTracker { // 服务器已经更新状态,任务已经执行完毕,开始释放所有资源 log.info("[TaskTracker-{}] instance(jobId={}) process finished,result = {}, start to release resource...", - instanceId, instanceInfo.getJobId(), resultTask.getResult()); + instanceId, instanceInfo.getJobId(), result); destroy(); return; } - // 4. 未完成,上报状态 + // 5. 未完成,上报状态 req.setInstanceStatus(InstanceStatus.RUNNING.getV()); serverActor.tell(req, null); - // 5.1 定期检查 -> 重试派发后未确认的任务 + // 6.1 定期检查 -> 重试派发后未确认的任务 long currentMS = System.currentTimeMillis(); if (holder.workerUnreceivedNum != 0) { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { @@ -230,14 +241,14 @@ public class CommonTaskTracker extends TaskTracker { }); } - // 5.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务 + // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务 List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); if (!disconnectedPTs.isEmpty()) { log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); taskPersistenceService.updateLostTasks(disconnectedPTs); } - // 5.2 超时检查 -> 等待执行/执行中的任务(要不要采取 Worker不挂不行动准则,Worker挂了再重新派发任务) + // 6.3 超时检查 -> 检查超时的Task } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 9c2c3299..3403da93 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -264,12 +264,12 @@ public class FrequentTaskTracker extends TaskTracker { req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setReportTime(System.currentTimeMillis()); + req.setStartTime(createTime); req.setInstanceStatus(InstanceStatus.RUNNING.getV()); req.setTotalTaskNum(triggerTimes.get()); req.setSucceedTaskNum(succeedTimes.get()); req.setFailedTaskNum(failedTimes.get()); - req.setReportTime(System.currentTimeMillis()); req.setSourceAddress(OhMyWorker.getWorkerAddress()); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 9b1cae26..eaf0ff77 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -54,7 +54,7 @@ public abstract class TaskTracker { // 定时任务线程池 protected ScheduledExecutorService scheduledPool; // 是否结束 - protected AtomicBoolean finished = new AtomicBoolean(false); + protected AtomicBoolean finished; protected TaskTracker(ServerScheduleJobReq req) { @@ -65,6 +65,7 @@ public abstract class TaskTracker { BeanUtils.copyProperties(req, instanceInfo); this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); this.taskPersistenceService = TaskPersistenceService.INSTANCE; + this.finished = new AtomicBoolean(false); // 子类自定义初始化操作 initTaskTracker(req); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java index 5fa0f504..cefc171f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java @@ -34,8 +34,6 @@ public class InstanceInfo implements Serializable { */ // 整个任务的总体超时时间 private long instanceTimeoutMS; - // Task的超时时间 - private long taskTimeoutMS; /** * 任务运行参数