prepare to go to my brother's birthday party

This commit is contained in:
tjq 2020-04-09 16:50:46 +08:00
parent e99292f027
commit a42bc496df
23 changed files with 311 additions and 79 deletions

View File

@ -24,6 +24,7 @@ public class RemoteConstant {
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
public static final String SERVER_ACTOR_NAME = "server_actor";
public static final String SERVER_FRIEND_ACTOR_NAME = "friend_actor";
public static final String SERVER_AKKA_CONFIG_NAME = "oms-server.akka.conf";

View File

@ -65,4 +65,7 @@ public class ServerScheduleJobReq implements Serializable {
// 时间表达式CRON/NULL/LONG/LONG单位MS
private String timeExpression;
// 最大同时运行任务数默认 1
private Integer maxInstanceNum;
}

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.common.request;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 服务器要求任务实例停止执行请求
*
* @author tjq
* @since 2020/4/9
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerStopInstanceReq implements Serializable {
private Long instanceId;
}

View File

@ -26,4 +26,5 @@ public class TaskTrackerReportInstanceStatusReq implements Serializable {
private long failedTaskNum;
private long reportTime;
private String sourceAddress;
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.server;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

View File

@ -1,10 +1,12 @@
package com.github.kfcfans.oms.server.core.akka;
package com.github.kfcfans.oms.server.akka;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.akka.actors.FriendActor;
import com.github.kfcfans.oms.server.akka.actors.ServerActor;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@ -47,6 +49,7 @@ public class OhMyServer {
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch);
}
@ -57,7 +60,7 @@ public class OhMyServer {
* @return ActorSelection
*/
public static ActorSelection getServerActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
return actorSystem.actorSelection(path);
}

View File

@ -0,0 +1,61 @@
package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.Ping;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* 处理朋友们的信息处理服务器与服务器之间的通讯
*
* @author tjq
* @since 2020/4/9
*/
@Slf4j
public class FriendActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Ping.class, this::onReceivePing)
.match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
/**
* 处理存活检测的请求
* @param ping 存活检测请求
*/
private void onReceivePing(Ping ping) {
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(true);
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
getSender().tell(askResponse, getSelf());
}
/**
* 处理停止任务实例的请求
* @param req 停止运行任务实例
*/
private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) {
Long instanceId = req.getServerStopInstanceReq().getInstanceId();
String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId);
// 非空发请求停止任务实例
if (StringUtils.isNotEmpty(taskTrackerAddress)) {
OhMyServer.getTaskTrackerActor(taskTrackerAddress).tell(req.getServerStopInstanceReq(), getSelf());
return;
}
// 可能刚经历 Server 变更 TaskTracker 宕机先忽略吧打条日志压压惊
log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId);
}
}

View File

@ -1,10 +1,11 @@
package com.github.kfcfans.oms.server.core.akka;
package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.core.InstanceManager;
import com.github.kfcfans.oms.server.akka.requests.Ping;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import lombok.extern.slf4j.Slf4j;
@ -22,21 +23,11 @@ public class ServerActor extends AbstractActor {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(Ping.class, this::onReceivePing)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
/**
* 处理存活检测的请求
* @param ping 存活检测请求
*/
private void onReceivePing(Ping ping) {
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(true);
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
getSender().tell(askResponse, getSelf());
}
/**
* 处理 Worker 的心跳请求

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.core.akka;
package com.github.kfcfans.oms.server.akka.requests;
import lombok.Data;

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.oms.server.akka.requests;
import com.github.kfcfans.common.request.ServerStopInstanceReq;
import lombok.Data;
import java.io.Serializable;
/**
* 重定向 ServerStopInstanceReq
* 被HTTP请求停止任务实例的机器需要将请求转发到该实例对应的Server上处理由该Server下发到Worker只有该Server持有Worker的地址信息
*
* @author tjq
* @since 2020/4/9
*/
@Data
public class RedirectServerStopInstanceReq implements Serializable {
private ServerStopInstanceReq serverStopInstanceReq;
}

View File

@ -19,4 +19,12 @@ public enum JobStatus {
private int v;
public static JobStatus of(int v) {
for (JobStatus type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown JobStatus of " + v);
}
}

View File

@ -37,7 +37,7 @@ public class ExecuteLogDO {
// 结束时间
private Long finishedTime;
// 总共执行的次数CRON任务 -> 代表重试次数FREQUENT -> 代表总执行次数
// 总共执行的次数用于重试判断
private Long runningTimes;
private Date gmtCreate;

View File

@ -5,7 +5,7 @@ import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;

View File

@ -3,8 +3,8 @@ package com.github.kfcfans.oms.server.service.ha;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.core.akka.Ping;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.Ping;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.lock.LockService;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.core;
package com.github.kfcfans.oms.server.service.instance;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
@ -119,11 +119,25 @@ public class InstanceManager {
// 清除已完成的实例信息
if (finished) {
instanceId2JobInfo.remove(instanceId);
instanceId2StatusHolder.remove(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO
instanceId2JobInfo.remove(instanceId);
}
}
/**
* 获取某个任务实例对应的 TaskTracker 地址
* @param instanceId 任务实例ID
* @return TaskTracker地址IP:Port
*/
public static String getTaskTrackerAddress(Long instanceId) {
InstanceStatusHolder statusHolder = instanceId2StatusHolder.get(instanceId);
if (statusHolder == null) {
return null;
}
return statusHolder.getSourceAddress();
}
private static ExecuteLogRepository getExecuteLogRepository() {
while (executeLogRepository == null) {
try {

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.core;
package com.github.kfcfans.oms.server.service.instance;
import lombok.Data;
@ -22,4 +22,6 @@ public class InstanceStatusHolder {
// 上次上报时间
private long lastReportTime;
// 源地址TaskTracker 地址
private String sourceAddress;
}

View File

@ -1,7 +1,9 @@
package com.github.kfcfans.oms.server.service.timing;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
@ -98,12 +100,40 @@ public class InstanceStatusCheckService {
if (!CollectionUtils.isEmpty(failedInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
failedInstances.forEach(instance -> {
// 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
JobStatus jobStatus = JobStatus.of(jobInfoDO.getStatus());
// 如果任务已关闭则不进行重试将任务置为失败即可
if (jobStatus != JobStatus.ENABLE) {
updateFailedInstance(instance);
return;
}
// 秒级任务无限重试直接派发
if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) {
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
return;
}
// CRON API一样失败次数 + 1根据重试配置进行重试
if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) {
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
}else {
updateFailedInstance(instance);
}
});
}
});
}
private void updateFailedInstance(ExecuteLogDO instance) {
instance.setStatus(InstanceStatus.FAILED.getV());
instance.setFinishedTime(System.currentTimeMillis());
instance.setGmtModified(new Date());
instance.setResult("worker report timeout, maybe all worker down");
executeLogRepository.saveAndFlush(instance);
}
}

View File

@ -4,8 +4,8 @@ import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.core.InstanceManager;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
@ -29,11 +29,11 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* 任务调度执行服务
* 任务调度执行服务调度 CRON 表达式的任务进行执行
* FIX_RATE和FIX_DELAY任务不需要被调度创建后直接被派发到Worker执行只需要失败重试机制在InstanceStatusCheckService中完成
*
* @author tjq
* @since 2020/4/5
@ -75,13 +75,6 @@ public class JobScheduleService {
}catch (Exception e) {
log.error("[JobScheduleService] schedule cron job failed.", e);
}
// 调度 FIX_RATE FIX_DELAY JOB
try {
scheduleFrequentJob(allAppIds);
}catch (Exception e) {
log.error("[JobScheduleService] schedule frequent job failed.", e);
}
log.info("[JobScheduleService] finished job schedule, using time {}.", stopwatch.stop());
}
@ -177,40 +170,4 @@ public class JobScheduleService {
}
});
}
/**
* 调度 FIX_RATE FIX_DELAY 的任务
*/
private void scheduleFrequentJob(List<Long> appIds) {
List<JobInfoDO> fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV());
List<JobInfoDO> fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV());
List<Long> jobIds = Lists.newLinkedList();
Map<Long, JobInfoDO> jobId2JobInfo = Maps.newHashMap();
Consumer<JobInfoDO> consumer = jobInfo -> {
jobIds.add(jobInfo.getId());
jobId2JobInfo.put(jobInfo.getId(), jobInfo);
};
fixDelayJobs.forEach(consumer);
fixRateJobs.forEach(consumer);
if (CollectionUtils.isEmpty(jobIds)) {
log.info("[JobScheduleService] no frequent job need to schedule for appIds in {}.", appIds);
return;
}
// 查询 ExecuteLog 不存在或非运行状态则重新调度
List<ExecuteLogDO> executeLogDOS = executeLogRepository.findByJobIdIn(jobIds);
executeLogDOS.forEach(executeLogDO -> {
if (executeLogDO.getStatus() == InstanceStatus.RUNNING.getV()) {
jobId2JobInfo.remove(executeLogDO.getJobId());
}
});
// 重新Dispatch
jobId2JobInfo.values().forEach(jobInfoDO -> {
});
}
}

View File

@ -0,0 +1,65 @@
package com.github.kfcfans.oms.server.web.controller;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.request.ServerStopInstanceReq;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.ha.ServerSelectService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 任务实例 Controller
*
* @author tjq
* @since 2020/4/9
*/
@RestController
@RequestMapping("/instance")
public class InstanceController {
@Resource
private ExecuteLogRepository executeLogRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private AppInfoRepository appInfoRepository;
@GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long instanceId) throws Exception {
// 联级查询instanceId -> jobId -> appId -> serverAddress
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) {
return ResultDTO.failed("invalid instanceId: " + instanceId);
}
JobInfoDO jobInfoDO = jobInfoRepository.findById(executeLogDO.getJobId()).orElseThrow(() -> {
throw new RuntimeException("impossible");
});
AppInfoDO appInfoDO = appInfoRepository.findById(jobInfoDO.getAppId()).orElseThrow(() -> {
throw new RuntimeException("impossible");
});
String serverAddress = appInfoDO.getCurrentServer();
// 将请求转发给目标ServerHTTP -> AKKA
ActorSelection serverActor = OhMyServer.getServerActor(serverAddress);
RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq();
req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId));
serverActor.tell(req, null);
return ResultDTO.success(null);
}
}

View File

@ -1,12 +1,18 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.IdGenerateService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import org.springframework.beans.BeanUtils;
import org.springframework.web.bind.annotation.GetMapping;
@ -16,6 +22,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Optional;
/**
* 任务信息管理 Controller
@ -23,12 +30,16 @@ import java.util.Date;
* @author tjq
* @since 2020/3/30
*/
@RestController()
@RestController
@RequestMapping("job")
public class JobController {
@Resource
private DispatchService dispatchService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private ExecuteLogRepository executeLogRepository;
@PostMapping("/save")
public ResultDTO<Void> saveJobInfo(ModifyJobInfoRequest request) throws Exception {
@ -54,13 +65,46 @@ public class JobController {
}
jobInfoDO.setGmtModified(now);
jobInfoRepository.saveAndFlush(jobInfoDO);
// 秒级任务直接调度执行
if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) {
ExecuteLogDO executeLog = new ExecuteLogDO();
executeLog.setJobId(jobInfoDO.getId());
executeLog.setInstanceId(IdGenerateService.allocate());
executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
executeLog.setExpectedTriggerTime(System.currentTimeMillis());
executeLog.setGmtCreate(new Date());
executeLog.setGmtModified(executeLog.getGmtCreate());
executeLogRepository.saveAndFlush(executeLog);
dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0);
}
return ResultDTO.success(null);
}
@GetMapping("/stop")
public ResultDTO<Void> stopJob(Long jobId) throws Exception {
updateJobStatus(jobId, JobStatus.STOPPED);
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteJobInfo(Long jobId) {
jobInfoRepository.deleteById(jobId);
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
updateJobStatus(jobId, JobStatus.DELETED);
return ResultDTO.success(null);
}
private void updateJobStatus(Long jobId, JobStatus status) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> {
throw new IllegalArgumentException("can't find job which id is " + jobId);
});
jobInfoDO.setStatus(status.getV());
jobInfoRepository.saveAndFlush(jobInfoDO);
// TODO: 关闭秒级任务
}
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.ha.ServerSelectService;

View File

@ -112,6 +112,8 @@ public class CommonTaskTracker extends TaskTracker {
req.setSucceedTaskNum(holder.succeedNum);
req.setFailedTaskNum(holder.failedNum);
req.setReportTime(System.currentTimeMillis());
req.setSourceAddress(OhMyWorker.getWorkerAddress());
// 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果
TaskDO resultTask = null;

View File

@ -42,6 +42,8 @@ public class FrequentTaskTracker extends TaskTracker {
// 时间表达式类型
private TimeExpressionType timeExpressionType;
private long timeParams;
// 最大同时运行实例数
private int maxInstanceNum;
// 总运行次数正常情况不会出现锁竞争直接用 Atomic 系列锁竞争验证推荐 LongAdder
private AtomicLong triggerTimes;
@ -68,6 +70,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 0. 初始化实例变量
timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
timeParams = Long.parseLong(req.getTimeExpression());
maxInstanceNum = req.getMaxInstanceNum();
triggerTimes = new AtomicLong(0);
succeedTimes = new AtomicLong(0);
@ -133,8 +136,16 @@ public class FrequentTaskTracker extends TaskTracker {
newRootTask.setCreatedTime(System.currentTimeMillis());
newRootTask.setLastModifiedTime(System.currentTimeMillis());
// 判断是否超出最大执行实例数
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
if (subInstanceId2TimeHolder.size() > maxInstanceNum) {
log.error("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE");
return;
}
}
// 必须先持久化持久化成功才能 Dispatch否则会导致后续报错因为DB中没有这个taskId对应的记录会各种报错
// 必须先持久化持久化成功才能 dispatch否则会导致后续报错因为DB中没有这个taskId对应的记录会各种报错
if (!taskPersistenceService.save(newRootTask)) {
log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId);
processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED");
@ -259,6 +270,7 @@ public class FrequentTaskTracker extends TaskTracker {
req.setSucceedTaskNum(succeedTimes.get());
req.setFailedTaskNum(failedTimes.get());
req.setReportTime(System.currentTimeMillis());
req.setSourceAddress(OhMyWorker.getWorkerAddress());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);