remove taskTimeLimit because I can't stop the running thread.

This commit is contained in:
tjq 2020-04-10 13:25:00 +08:00
parent a42bc496df
commit 3b331e70cc
20 changed files with 207 additions and 74 deletions

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.common; package com.github.kfcfans.common;
import java.time.Duration;
/** /**
* RemoteConstant * RemoteConstant
* *
@ -30,4 +32,5 @@ public class RemoteConstant {
/* ************************ OTHERS ************************ */ /* ************************ OTHERS ************************ */
public static final String EMPTY_ADDRESS = "N/A"; public static final String EMPTY_ADDRESS = "N/A";
public static final long DEFAULT_TIMEOUT_MS = 5000;
} }

View File

@ -0,0 +1,16 @@
package com.github.kfcfans.common.request;
import lombok.Data;
import java.io.Serializable;
/**
* 服务器查询实例运行状态需要返回详细的运行数据
*
* @author tjq
* @since 2020/4/10
*/
@Data
public class ServerQueryInstanceStatusReq implements Serializable {
private Long instanceId;
}

View File

@ -41,8 +41,6 @@ public class ServerScheduleJobReq implements Serializable {
*/ */
// 整个任务的总体超时时间 // 整个任务的总体超时时间
private long instanceTimeoutMS; private long instanceTimeoutMS;
// Task的超时时间
private long taskTimeoutMS;
/** /**
* 任务运行参数 * 任务运行参数

View File

@ -25,6 +25,7 @@ public class TaskTrackerReportInstanceStatusReq implements Serializable {
private long succeedTaskNum; private long succeedTaskNum;
private long failedTaskNum; private long failedTaskNum;
private long startTime;
private long reportTime; private long reportTime;
private String sourceAddress; private String sourceAddress;
} }

View File

@ -1,17 +1,26 @@
package com.github.kfcfans.oms.server.akka.actors; package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.pattern.Patterns;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.Ping; import com.github.kfcfans.oms.server.akka.requests.Ping;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerQueryInstanceStatusReq;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static com.github.kfcfans.common.RemoteConstant.DEFAULT_TIMEOUT_MS;
/** /**
* 处理朋友们的信息处理服务器与服务器之间的通讯 * 处理朋友们的信息处理服务器与服务器之间的通讯
* *
@ -25,13 +34,12 @@ public class FriendActor extends AbstractActor {
return receiveBuilder() return receiveBuilder()
.match(Ping.class, this::onReceivePing) .match(Ping.class, this::onReceivePing)
.match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq) .match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build(); .build();
} }
/** /**
* 处理存活检测的请求 * 处理存活检测的请求
* @param ping 存活检测请求
*/ */
private void onReceivePing(Ping ping) { private void onReceivePing(Ping ping) {
AskResponse askResponse = new AskResponse(); AskResponse askResponse = new AskResponse();
@ -42,7 +50,6 @@ public class FriendActor extends AbstractActor {
/** /**
* 处理停止任务实例的请求 * 处理停止任务实例的请求
* @param req 停止运行任务实例
*/ */
private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) { private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) {
@ -58,4 +65,29 @@ public class FriendActor extends AbstractActor {
// 可能刚经历 Server 变更 TaskTracker 宕机先忽略吧打条日志压压惊 // 可能刚经历 Server 变更 TaskTracker 宕机先忽略吧打条日志压压惊
log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId); log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId);
} }
/**
* 处理Server查询任务实例运行情况的请求
*/
private void onReceiveRedirectServerQueryInstanceStatusReq(RedirectServerQueryInstanceStatusReq req) {
Long instanceId = req.getReq().getInstanceId();
String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId);
AskResponse response = new AskResponse();
if (StringUtils.isEmpty(taskTrackerAddress)) {
response.setSuccess(false);
response.setExtra("can't find TaskTracker");
log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}).", instanceId);
}else {
try {
CompletionStage<Object> ask = Patterns.ask(OhMyServer.getTaskTrackerActor(taskTrackerAddress), req.getReq(), Duration.ofMillis(DEFAULT_TIMEOUT_MS));
response = (AskResponse) ask.toCompletableFuture().get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}catch (Exception e) {
log.warn("[FriendActor] Ask TaskTracker for instance(instanceId={}) status failed.", instanceId, e);
response.setSuccess(false);
response.setExtra(e.getMessage());
}
}
getSender().tell(response, getSelf());
}
} }

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.oms.server.akka.requests;
import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq;
import lombok.Data;
import java.io.Serializable;
/**
* 重定向 ServerQueryInstanceStatusReq
*
* @author tjq
* @since 2020/4/10
*/
@Data
public class RedirectServerQueryInstanceStatusReq implements Serializable {
private ServerQueryInstanceStatusReq req;
}

View File

@ -22,6 +22,8 @@ public class ExecuteLogDO {
// 任务ID // 任务ID
private Long jobId; private Long jobId;
// 任务所属应用的ID冗余提高查询效率
private Long appId;
// 任务实例ID // 任务实例ID
private Long instanceId; private Long instanceId;
/** /**

View File

@ -53,8 +53,6 @@ public class JobInfoDO {
private Integer concurrency; private Integer concurrency;
// 任务整体超时时间 // 任务整体超时时间
private Long instanceTimeLimit; private Long instanceTimeLimit;
// 任务的每一个Task超时时间
private Long taskTimeLimit;
/* ************************** 重试配置 ************************** */ /* ************************** 重试配置 ************************** */
private Integer instanceRetryNum; private Integer instanceRetryNum;

View File

@ -81,7 +81,6 @@ public class DispatchService {
req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name()); req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name());
req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit()); req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit());
req.setTaskTimeoutMS(jobInfo.getTaskTimeLimit());
req.setThreadConcurrency(jobInfo.getConcurrency()); req.setThreadConcurrency(jobInfo.getConcurrency());

View File

@ -138,6 +138,15 @@ public class InstanceManager {
return statusHolder.getSourceAddress(); return statusHolder.getSourceAddress();
} }
/**
* 获取任务的详细运行信息包括当前运行状态任务数量TaskTracker地址等
* @param instanceId 任务实例ID
* @return 任务实例详细运行信息
*/
public static InstanceStatusHolder getInstanceDetail(Long instanceId) {
return instanceId2StatusHolder.get(instanceId);
}
private static ExecuteLogRepository getExecuteLogRepository() { private static ExecuteLogRepository getExecuteLogRepository() {
while (executeLogRepository == null) { while (executeLogRepository == null) {
try { try {

View File

@ -20,6 +20,8 @@ public class InstanceStatusHolder {
private long succeedTaskNum; private long succeedTaskNum;
private long failedTaskNum; private long failedTaskNum;
// 任务开始时间
private long startTime;
// 上次上报时间 // 上次上报时间
private long lastReportTime; private long lastReportTime;
// 源地址TaskTracker 地址 // 源地址TaskTracker 地址

View File

@ -109,6 +109,7 @@ public class JobScheduleService {
ExecuteLogDO executeLog = new ExecuteLogDO(); ExecuteLogDO executeLog = new ExecuteLogDO();
executeLog.setJobId(jobInfoDO.getId()); executeLog.setJobId(jobInfoDO.getId());
executeLog.setAppId(jobInfoDO.getAppId());
executeLog.setInstanceId(IdGenerateService.allocate()); executeLog.setInstanceId(IdGenerateService.allocate());
executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
executeLog.setExpectedTriggerTime(jobInfoDO.getNextTriggerTime()); executeLog.setExpectedTriggerTime(jobInfoDO.getNextTriggerTime());

View File

@ -1,23 +1,26 @@
package com.github.kfcfans.oms.server.web.controller; package com.github.kfcfans.oms.server.web.controller;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.ServerStopInstanceReq; import com.github.kfcfans.common.request.ServerStopInstanceReq;
import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import org.apache.commons.lang3.StringUtils;
import com.github.kfcfans.oms.server.service.ha.ServerSelectService;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date;
import static com.github.kfcfans.common.InstanceStatus.*;
/** /**
* 任务实例 Controller * 任务实例 Controller
* *
@ -31,35 +34,49 @@ public class InstanceController {
@Resource @Resource
private ExecuteLogRepository executeLogRepository; private ExecuteLogRepository executeLogRepository;
@Resource @Resource
private JobInfoRepository jobInfoRepository;
@Resource
private AppInfoRepository appInfoRepository; private AppInfoRepository appInfoRepository;
@GetMapping("/stop") @GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long instanceId) throws Exception { public ResultDTO<Void> stopInstance(Long instanceId) {
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) {
return ResultDTO.failed("invalid instanceId: " + instanceId);
}
// 更新数据库将状态置为停止
executeLogDO.setStatus(STOPPED.getV());
executeLogDO.setGmtModified(new Date());
executeLogDO.setFinishedTime(System.currentTimeMillis());
executeLogDO.setResult("STOPPED_BY_USER");
executeLogRepository.saveAndFlush(executeLogDO);
// 获取Server地址准备转发请求
AppInfoDO appInfoDO = appInfoRepository.findById(executeLogDO.getAppId()).orElse(new AppInfoDO());
if (StringUtils.isEmpty(appInfoDO.getCurrentServer())) {
return ResultDTO.failed("can't find server");
}
// 将请求转发给目标ServerHTTP -> AKKA
ActorSelection serverActor = OhMyServer.getServerActor(appInfoDO.getCurrentServer());
RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq();
req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId));
serverActor.tell(req, null);
return ResultDTO.success(null);
}
@GetMapping("/status")
public ResultDTO<Void> getRunningStatus(Long instanceId) {
// 联级查询instanceId -> jobId -> appId -> serverAddress
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) { if (executeLogDO == null) {
return ResultDTO.failed("invalid instanceId: " + instanceId); return ResultDTO.failed("invalid instanceId: " + instanceId);
} }
JobInfoDO jobInfoDO = jobInfoRepository.findById(executeLogDO.getJobId()).orElseThrow(() -> { InstanceStatus status = InstanceStatus.of(executeLogDO.getStatus());
throw new RuntimeException("impossible"); if (status == FAILED || status == SUCCEED || status == STOPPED) {
});
AppInfoDO appInfoDO = appInfoRepository.findById(jobInfoDO.getAppId()).orElseThrow(() -> { }
throw new RuntimeException("impossible");
});
String serverAddress = appInfoDO.getCurrentServer(); return null;
// 将请求转发给目标ServerHTTP -> AKKA
ActorSelection serverActor = OhMyServer.getServerActor(serverAddress);
RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq();
req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId));
serverActor.tell(req, null);
return ResultDTO.success(null);
} }
} }

View File

@ -71,6 +71,7 @@ public class JobController {
ExecuteLogDO executeLog = new ExecuteLogDO(); ExecuteLogDO executeLog = new ExecuteLogDO();
executeLog.setJobId(jobInfoDO.getId()); executeLog.setJobId(jobInfoDO.getId());
executeLog.setAppId(jobInfoDO.getAppId());
executeLog.setInstanceId(IdGenerateService.allocate()); executeLog.setInstanceId(IdGenerateService.allocate());
executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
executeLog.setExpectedTriggerTime(System.currentTimeMillis()); executeLog.setExpectedTriggerTime(System.currentTimeMillis());

View File

@ -1,8 +1,9 @@
package com.github.kfcfans.oms.worker.actors; package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.worker.core.tracker.task.CommonTaskTracker; import com.github.kfcfans.common.request.ServerStopInstanceReq;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskDO;
@ -33,6 +34,8 @@ public class TaskTrackerActor extends AbstractActor {
.match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest) .match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest)
.match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq) .match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq)
.match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq) .match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq)
.match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq)
.match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq)
.matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj))
.build(); .build();
} }
@ -129,4 +132,27 @@ public class TaskTrackerActor extends AbstractActor {
} }
taskTracker.receiveProcessorTrackerHeartbeat(req); taskTracker.receiveProcessorTrackerHeartbeat(req);
} }
/**
* 停止任务实例
*/
private void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) {
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ServerStopInstanceReq({}) but system can't find TaskTracker.", req);
return;
}
taskTracker.destroy();
}
/**
* 查询任务实例运行状态
*/
private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) {
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req);
return;
}
}
} }

View File

@ -104,22 +104,12 @@ public class ProcessorTracker {
} }
} }
/**
* 任务是否超时
*/
public boolean isTimeout() {
return System.currentTimeMillis() - startTime > instanceInfo.getInstanceTimeoutMS();
}
/** /**
* 释放资源 * 释放资源
*/ */
public void destroy() { public void destroy() {
// 1. 关闭定时线程池 // 1. 关闭执行执行线程池
CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow());
// 2. 关闭执行执行线程池
CommonUtils.executeIgnoreException(() -> { CommonUtils.executeIgnoreException(() -> {
List<Runnable> tasks = threadPool.shutdownNow(); List<Runnable> tasks = threadPool.shutdownNow();
if (!CollectionUtils.isEmpty(tasks)) { if (!CollectionUtils.isEmpty(tasks)) {
@ -128,11 +118,14 @@ public class ProcessorTracker {
return null; return null;
}); });
// 3. 去除顶层引用送入GC世界 // 2. 去除顶层引用送入GC世界
taskTrackerActorRef = null; taskTrackerActorRef = null;
ProcessorTrackerPool.removeProcessorTracker(instanceId); ProcessorTrackerPool.removeProcessorTracker(instanceId);
log.info("[ProcessorTracker-{}] mission complete, ProcessorTracker already destroyed!", instanceId); log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId);
// 3. 关闭定时线程池
CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow());
} }
@ -162,22 +155,30 @@ public class ProcessorTracker {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build();
timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 10, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);
} }
/** /**
* 定时向 TaskTracker 汇报携带任务执行信息的心跳 * 定时向 TaskTracker 汇报携带任务执行信息的心跳
*/ */
private class TimingStatusReportRunnable implements Runnable { private class CheckerAndReporter implements Runnable {
@Override @Override
public void run() { public void run() {
long interval = System.currentTimeMillis() - startTime;
if (interval > instanceInfo.getInstanceTimeoutMS()) {
log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
destroy();
return;
}
long waitingNum = threadPool.getQueue().size(); long waitingNum = threadPool.getQueue().size();
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum); ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
taskTrackerActorRef.tell(req, null); taskTrackerActorRef.tell(req, null);
} }
} }
} }

View File

@ -112,32 +112,36 @@ public class CommonTaskTracker extends TaskTracker {
req.setSucceedTaskNum(holder.succeedNum); req.setSucceedTaskNum(holder.succeedNum);
req.setFailedTaskNum(holder.failedNum); req.setFailedTaskNum(holder.failedNum);
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
req.setStartTime(createTime);
req.setSourceAddress(OhMyWorker.getWorkerAddress()); req.setSourceAddress(OhMyWorker.getWorkerAddress());
boolean success = false;
String result = null;
// 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果 // 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果
TaskDO resultTask = null;
if (unfinishedNum == 0) { if (unfinishedNum == 0) {
boolean finishedBoolean = true;
// 数据库中一个任务都没有说明根任务创建失败该任务实例失败 // 数据库中一个任务都没有说明根任务创建失败该任务实例失败
if (finishedNum == 0) { if (finishedNum == 0) {
resultTask = new TaskDO(); finished.set(true);
resultTask.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()); success = false;
resultTask.setResult("CREATE_ROOT_TASK_FAILED"); result = "CREATE_ROOT_TASK_FAILED";
}else { }else {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
// STANDALONE 只有一个任务完成即结束 // STANDALONE 只有一个任务完成即结束
if (executeType == ExecuteType.STANDALONE) { if (executeType == ExecuteType.STANDALONE) {
finished.set(true);
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId); List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
success = false;
result = "UNKNOWN BUG";
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
}else { }else {
resultTask = allTask.get(0); result = allTask.get(0).getResult();
success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
} }
} else { } else {
@ -147,14 +151,18 @@ public class CommonTaskTracker extends TaskTracker {
if (lastTaskOptional.isPresent()) { if (lastTaskOptional.isPresent()) {
// 存在则根据 reduce 任务来判断状态 // 存在则根据 reduce 任务来判断状态
resultTask = lastTaskOptional.get(); TaskDO resultTask = lastTaskOptional.get();
TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus()); TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED;
if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
finished.set(true);
success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
result = resultTask.getResult();
}
}else { }else {
// 不存在代表前置任务刚刚执行完毕需要创建 lastTask最终任务必须在本机执行 // 不存在代表前置任务刚刚执行完毕需要创建 lastTask最终任务必须在本机执行
finishedBoolean = false;
TaskDO newLastTask = new TaskDO(); TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(LAST_TASK_ID); newLastTask.setTaskId(LAST_TASK_ID);
@ -164,19 +172,22 @@ public class CommonTaskTracker extends TaskTracker {
} }
} }
} }
}
// 3. 检查任务实例整体是否超时
finished.set(finishedBoolean); if (isTimeout()) {
finished.set(true);
success = false;
result = "TIMEOUT";
} }
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
// 3. 执行完毕报告服务器第二个判断则是为了取消烦人的编译器警告 // 4. 执行完毕报告服务器
if (finished.get() && resultTask != null) { if (finished.get()) {
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); req.setResult(result);
req.setResult(resultTask.getResult());
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS)); CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS));
@ -186,7 +197,7 @@ public class CommonTaskTracker extends TaskTracker {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
serverAccepted = askResponse.isSuccess(); serverAccepted = askResponse.isSuccess();
}catch (Exception e) { }catch (Exception e) {
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, resultTask.getResult()); log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result);
} }
// 服务器未接受上报则等待下次重新上报 // 服务器未接受上报则等待下次重新上报
@ -196,17 +207,17 @@ public class CommonTaskTracker extends TaskTracker {
// 服务器已经更新状态任务已经执行完毕开始释放所有资源 // 服务器已经更新状态任务已经执行完毕开始释放所有资源
log.info("[TaskTracker-{}] instance(jobId={}) process finished,result = {}, start to release resource...", log.info("[TaskTracker-{}] instance(jobId={}) process finished,result = {}, start to release resource...",
instanceId, instanceInfo.getJobId(), resultTask.getResult()); instanceId, instanceInfo.getJobId(), result);
destroy(); destroy();
return; return;
} }
// 4. 未完成上报状态 // 5. 未完成上报状态
req.setInstanceStatus(InstanceStatus.RUNNING.getV()); req.setInstanceStatus(InstanceStatus.RUNNING.getV());
serverActor.tell(req, null); serverActor.tell(req, null);
// 5.1 定期检查 -> 重试派发后未确认的任务 // 6.1 定期检查 -> 重试派发后未确认的任务
long currentMS = System.currentTimeMillis(); long currentMS = System.currentTimeMillis();
if (holder.workerUnreceivedNum != 0) { if (holder.workerUnreceivedNum != 0) {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
@ -230,14 +241,14 @@ public class CommonTaskTracker extends TaskTracker {
}); });
} }
// 5.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务 // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
if (!disconnectedPTs.isEmpty()) { if (!disconnectedPTs.isEmpty()) {
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
taskPersistenceService.updateLostTasks(disconnectedPTs); taskPersistenceService.updateLostTasks(disconnectedPTs);
} }
// 5.2 超时检查 -> 等待执行/执行中的任务要不要采取 Worker不挂不行动准则Worker挂了再重新派发任务 // 6.3 超时检查 -> 检查超时的Task
} }

View File

@ -264,12 +264,12 @@ public class FrequentTaskTracker extends TaskTracker {
req.setJobId(instanceInfo.getJobId()); req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId); req.setInstanceId(instanceId);
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
req.setStartTime(createTime);
req.setInstanceStatus(InstanceStatus.RUNNING.getV()); req.setInstanceStatus(InstanceStatus.RUNNING.getV());
req.setTotalTaskNum(triggerTimes.get()); req.setTotalTaskNum(triggerTimes.get());
req.setSucceedTaskNum(succeedTimes.get()); req.setSucceedTaskNum(succeedTimes.get());
req.setFailedTaskNum(failedTimes.get()); req.setFailedTaskNum(failedTimes.get());
req.setReportTime(System.currentTimeMillis());
req.setSourceAddress(OhMyWorker.getWorkerAddress()); req.setSourceAddress(OhMyWorker.getWorkerAddress());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);

View File

@ -54,7 +54,7 @@ public abstract class TaskTracker {
// 定时任务线程池 // 定时任务线程池
protected ScheduledExecutorService scheduledPool; protected ScheduledExecutorService scheduledPool;
// 是否结束 // 是否结束
protected AtomicBoolean finished = new AtomicBoolean(false); protected AtomicBoolean finished;
protected TaskTracker(ServerScheduleJobReq req) { protected TaskTracker(ServerScheduleJobReq req) {
@ -65,6 +65,7 @@ public abstract class TaskTracker {
BeanUtils.copyProperties(req, instanceInfo); BeanUtils.copyProperties(req, instanceInfo);
this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress());
this.taskPersistenceService = TaskPersistenceService.INSTANCE; this.taskPersistenceService = TaskPersistenceService.INSTANCE;
this.finished = new AtomicBoolean(false);
// 子类自定义初始化操作 // 子类自定义初始化操作
initTaskTracker(req); initTaskTracker(req);

View File

@ -34,8 +34,6 @@ public class InstanceInfo implements Serializable {
*/ */
// 整个任务的总体超时时间 // 整个任务的总体超时时间
private long instanceTimeoutMS; private long instanceTimeoutMS;
// Task的超时时间
private long taskTimeoutMS;
/** /**
* 任务运行参数 * 任务运行参数