add debug mode

This commit is contained in:
tjq 2020-04-20 14:18:53 +08:00
parent 3b547f58ee
commit 67bbf9e352
19 changed files with 148 additions and 18 deletions

View File

@ -33,6 +33,8 @@ public class InstanceDetail implements OmsSerializable {
// 秒级任务专用
private List<SubInstanceDetail> subInstanceDetails;
// 重试次数
private Long runningTimes;
// 秒级任务的 extra -> List<SubInstanceDetail>
@Data

View File

@ -81,11 +81,9 @@ public class JobInfoDO {
// 最大机器数量
private Integer maxWorkerCount;
// 报警用户ID列表多值逗号分隔
private String notifyUserIds;
private Date gmtCreate;
private Date gmtModified;
// 针对只查询 id 的情况
public JobInfoDO(Long id) {
this.id = id;
}
}

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.UserInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
/**
* 用户信息表数据库访问层
*
@ -10,4 +12,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/12
*/
public interface UserInfoRepository extends JpaRepository<UserInfoDO, Long> {
List<UserInfoDO> findByUsernameLike(String username);
}

View File

@ -0,0 +1,35 @@
package com.github.kfcfans.oms.server.service.alarm;
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
public class AlarmService {
private static List<Alarmable> alarmableList = Lists.newLinkedList();
public static void alarm(JobInfoDO jobInfo, InstanceLogDO instanceLog) {
if (CollectionUtils.isEmpty(alarmableList)) {
return;
}
alarmableList.forEach(alarmable -> {
try {
alarmable.alarm(jobInfo, instanceLog);
}catch (Exception e) {
log.warn("[AlarmService] alarm failed.", e);
}
});
}
}

View File

@ -0,0 +1,15 @@
package com.github.kfcfans.oms.server.service.alarm;
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
/**
* 报警接口
*
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
void alarm(JobInfoDO jobInfo, InstanceLogDO instanceLog);
}

View File

@ -10,7 +10,7 @@ import org.springframework.stereotype.Service;
/**
* 唯一ID生成服务使用 Twitter snowflake 算法
* 机房ID固定为0占用2位
* 机器ID数据库自增占用8位最多支持256台机器如果频繁部署需要删除数据库重置id
* 机器ID数据库自增占用14位如果频繁部署需要删除数据库重置id
*
* @author tjq
* @since 2020/4/6

View File

@ -16,8 +16,8 @@ class SnowFlakeIdGenerator {
/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 8; //机器标识占用的位数
private final static long SEQUENCE_BIT = 6; //序列号占用的位数
private final static long MACHINE_BIT = 14; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数
/**

View File

@ -9,6 +9,7 @@ import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@ -16,6 +17,7 @@ import org.springframework.beans.BeanUtils;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 管理被调度的服务
@ -111,7 +113,15 @@ public class InstanceManager {
if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) {
log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes());
getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes());
// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败
HashedWheelTimerHolder.TIMER.schedule(() -> {
getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes());
}, 10, TimeUnit.SECONDS);
// 修改状态为 等待派发正式开始重试
// 问题会丢失以往的调度记录actualTriggerTime什么的都会被覆盖
updateEntity.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
}else {
updateEntity.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis());

View File

@ -128,7 +128,9 @@ public class InstanceService {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (askResponse.isSuccess()) {
return askResponse.getData(InstanceDetail.class);
InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class);
instanceDetail.setRunningTimes(instanceLogDO.getRunningTimes());
return instanceDetail;
}else {
log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getMessage());
}

View File

@ -35,7 +35,7 @@ import java.util.stream.Collectors;
public class InstanceStatusCheckService {
private static final int MAX_BATCH_NUM = 10;
private static final long DISPATCH_TIMEOUT_MS = 10000;
private static final long DISPATCH_TIMEOUT_MS = 30000;
private static final long RECEIVE_TIMEOUT_MS = 60000;
private static final long RUNNING_TIMEOUT_MS = 60000;
@ -77,6 +77,13 @@ public class InstanceStatusCheckService {
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances);
waitingDispatchInstances.forEach(instance -> {
// 过滤因为失败重试而改成 WAITING_DISPATCH 状态的任务实例
long t = System.currentTimeMillis() - instance.getGmtModified().getTime();
if (t < DISPATCH_TIMEOUT_MS) {
return;
}
// 重新派发(orElseGet用于消除编译器警告...)
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), 0);

View File

@ -13,12 +13,15 @@ import com.github.kfcfans.oms.server.service.JobService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import com.github.kfcfans.oms.server.web.request.QueryJobInfoRequest;
import com.github.kfcfans.oms.server.web.response.JobInfoVO;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
@ -44,6 +47,9 @@ public class JobController {
@Resource
private JobInfoRepository jobInfoRepository;
private static final Splitter commaSplitter = Splitter.on(",");
private static final Joiner commaJoiner = Joiner.on(",").skipNulls();
@PostMapping("/save")
public ResultDTO<Void> saveJobInfo(@RequestBody ModifyJobInfoRequest request) throws Exception {
@ -61,6 +67,11 @@ public class JobController {
jobInfoDO.setMaxInstanceNum(0);
}
// 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(commaJoiner.join(request.getNotifyUserIds()));
}
// 计算下次调度时间
Date now = new Date();
if (timeExpressionType == TimeExpressionType.CRON) {
@ -159,6 +170,12 @@ public class JobController {
jobInfoVO.setProcessorType(processorType.name());
jobInfoVO.setEnable(jobInfoDO.getStatus() == JobStatus.ENABLE.getV());
if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) {
jobInfoVO.setNotifyUserIds(commaSplitter.splitToList(jobInfoDO.getNotifyUserIds()));
}else {
jobInfoVO.setNotifyUserIds(Lists.newLinkedList());
}
return jobInfoVO;
}

View File

@ -4,10 +4,13 @@ import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.UserInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.UserInfoRepository;
import com.github.kfcfans.oms.server.web.request.ModifyUserInfoRequest;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@ -39,11 +42,23 @@ public class UserInfoController {
}
@GetMapping("list")
public ResultDTO<List<UserItemVO>> list() {
List<UserItemVO> result = userInfoRepository.findAll().stream().map(x -> new UserItemVO(x.getId(), x.getUsername())).collect(Collectors.toList());
return ResultDTO.success(result);
public ResultDTO<List<UserItemVO>> list(@RequestParam(required = false) String name) {
List<UserInfoDO> result;
if (StringUtils.isEmpty(name)) {
result = userInfoRepository.findAll();
}else {
result = userInfoRepository.findByUsernameLike("%" + name + "%");
}
return ResultDTO.success(convert(result));
}
private static List<UserItemVO> convert(List<UserInfoDO> data) {
if (CollectionUtils.isEmpty(data)) {
return Lists.newLinkedList();
}
return data.stream().map(x -> new UserItemVO(x.getId(), x.getUsername())).collect(Collectors.toList());
}
@Getter
@NoArgsConstructor

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.request;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* 创建/修改 JobInfo 请求
@ -74,4 +75,7 @@ public class ModifyJobInfoRequest {
private String designatedWorkers;
// 最大机器数量
private Integer maxWorkerCount;
// 报警用户ID列表
private List<Long> notifyUserIds;
}

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.response;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* JobInfo 对外展示对象
@ -72,4 +73,7 @@ public class JobInfoVO {
private String designatedWorkers;
// 最大机器数量
private Integer maxWorkerCount;
// 报警用户ID列表
private List<String> notifyUserIds;
}

View File

@ -75,7 +75,11 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
try {
// 校验 appName
appId = assertAppName();
if (!config.isEnableTestMode()) {
appId = assertAppName();
}else {
log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env.");
}
// 初始化 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
@ -99,7 +103,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
// 服务发现
currentServer = ServerDiscoveryService.discovery();
if (StringUtils.isEmpty(currentServer)) {
if (StringUtils.isEmpty(currentServer) && !config.isEnableTestMode()) {
throw new RuntimeException("can't find any available server, this worker has been quarantined.");
}
log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer);

View File

@ -30,4 +30,9 @@ public class OhMyConfig {
*/
private int maxResultLength = 8096;
/**
* 启动测试模式true情况下不再尝试连接 server 并验证appName
* true -> 用于本地写单元测试调试 false -> 默认值标准模式
*/
private boolean enableTestMode = false;
}

View File

@ -264,6 +264,7 @@ public abstract class TaskTracker {
*/
public void destroy() {
Stopwatch sw = Stopwatch.createStarted();
// 0. 开始关闭线程池不能使用 shutdownNow()因为 destroy 方法本身就在 scheduledPool 的线程中执行强行关闭会打断 destroy 的执行
scheduledPool.shutdown();
@ -290,7 +291,7 @@ public abstract class TaskTracker {
// 3. 移除顶层引用送去 GC
TaskTrackerPool.remove(instanceId);
log.info("[TaskTracker-{}] TaskTracker has left the world, bye~", instanceId);
log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", instanceId, sw.stop());
// 4. 强制关闭线程池
if (!scheduledPool.isTerminated()) {

View File

@ -30,6 +30,8 @@ public class CommonTaskTrackerTest {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
ohMyConfig.setEnableTestMode(true);
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();

View File

@ -2,7 +2,11 @@ akka {
actor {
# for test
provider = remote
allow-java-serialization = on
allow-java-serialization = off
serialization-bindings {
"com.github.kfcfans.common.OmsSerializable" = jackson-cbor
}
}
remote {
artery {