almost finished the v1.0.0-beta, prepare for release recently

This commit is contained in:
tjq 2020-04-14 20:18:27 +08:00
parent 2a3640c88c
commit 07d63df4d7
14 changed files with 116 additions and 17 deletions

View File

@ -56,4 +56,8 @@ public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long
Page<InstanceLogDO> findByJobId(long jobId, Pageable pageable);
// 只会有一条数据只是为了统一
Page<InstanceLogDO> findByInstanceId(long instanceId, Pageable pageable);
// 数据统计
long countByAppIdAndStatus(long appId, int status);
long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time);
}

View File

@ -25,6 +25,5 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
long countByAppId(long appId);
long countByAppIdAndStatus(long appId, int status);
long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time);
}

View File

@ -19,7 +19,13 @@ public class ControllerExceptionHandler {
@ResponseBody
@ExceptionHandler(Exception.class)
public ResultDTO<Void> exceptionHandler(Exception e) {
log.error("[ControllerException] http request failed.", e);
// 不是所有异常都需要打印完整堆栈后续可以定义内部的Exception便于判断
if (e instanceof IllegalArgumentException) {
log.warn("[ControllerException] http request failed, message is {}.", e.getMessage());
}else {
log.error("[ControllerException] http request failed.", e);
}
return ResultDTO.failed(e.getMessage());
}
}

View File

@ -10,8 +10,6 @@ import com.github.kfcfans.oms.server.service.CacheService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest;
import com.github.kfcfans.oms.server.web.response.InstanceLogVO;
import com.github.kfcfans.oms.server.web.response.JobInfoVO;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Page;
@ -86,7 +84,11 @@ public class InstanceController {
instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId()));
// 格式化时间
instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN));
if (instanceLogDO.getActualTriggerTime() == null) {
instanceLogVO.setActualTriggerTime("N/A");
}else {
instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN));
}
if (instanceLogDO.getFinishedTime() == null) {
instanceLogVO.setFinishedTime("N/A");
}else {

View File

@ -11,6 +11,7 @@ import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.web.response.SystemOverviewVO;
import com.github.kfcfans.oms.server.web.response.WorkerStatusVO;
@ -41,6 +42,8 @@ public class SystemInfoController {
private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceLogRepository instanceLogRepository;
@GetMapping("/listWorker")
@SuppressWarnings("unchecked")
@ -86,10 +89,10 @@ public class SystemInfoController {
// 总任务数量
overview.setJobCount(jobInfoRepository.countByAppId(appId));
// 运行任务数
overview.setRunningInstanceCount(jobInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
overview.setRunningInstanceCount(instanceLogRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
// 近期失败任务数24H内
Date date = DateUtils.addDays(new Date(), -1);
overview.setFailedInstanceCount(jobInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
overview.setFailedInstanceCount(instanceLogRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
return ResultDTO.success(overview);
}

View File

@ -8,7 +8,7 @@ import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.ServerDiscoveryService;
import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable;
import com.github.kfcfans.oms.worker.background.WorkerHealthReporter;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.utils.NetUtils;
@ -106,7 +106,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
// 初始化定时任务
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build();
timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory);
timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);

View File

@ -5,11 +5,17 @@ import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.core.tracker.task.FrequentTaskTracker;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 服务发现
@ -20,8 +26,14 @@ import java.util.Map;
@Slf4j
public class ServerDiscoveryService {
// 配置的可发起HTTP请求的ServerIP:Port
private static final Map<String, String> IP2ADDRESS = Maps.newHashMap();
// 服务发现地址
private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s";
// 失败次数
private static int FAILED_COUNT = 0;
// 最大失败次数
private static final int MAX_FAILED_COUNT = 3;
public static String discovery() {
@ -50,9 +62,27 @@ public class ServerDiscoveryService {
}
if (StringUtils.isEmpty(result)) {
log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
log.warn("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
// Server 高可用的前提下连续失败多次说明该节点与外界失联Server已经将秒级任务转移到其他Worker需要杀死本地的任务
if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
log.error("[OMS-ServerDiscoveryService] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();
if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
frequentInstanceIds.forEach(instanceId -> {
TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);
taskTracker.destroy();
log.warn("[OMS-ServerDiscoveryService] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
});
}
FAILED_COUNT = 0;
}
return null;
}else {
// 重置失败次数
FAILED_COUNT = 0;
log.debug("[OMS-ServerDiscoveryService] current server is {}.", result);
return result;
}

View File

@ -19,7 +19,7 @@ import org.springframework.util.StringUtils;
*/
@Slf4j
@AllArgsConstructor
public class WorkerHealthReportRunnable implements Runnable {
public class WorkerHealthReporter implements Runnable {
@Override
public void run() {
@ -41,6 +41,9 @@ public class WorkerHealthReportRunnable implements Runnable {
// 发送请求
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
if (StringUtils.isEmpty(serverPath)) {
return;
}
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath);
actorSelection.tell(heartbeat, null);
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.common;
import com.github.kfcfans.oms.worker.common.constants.StoreStrategy;
import lombok.Data;
import java.util.List;
@ -20,4 +21,8 @@ public class OhMyConfig {
* 调度服务器地址ip:port
*/
private List<String> serverAddress;
/**
* 本地持久化方式默认使用磁盘
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
}

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.oms.worker.common.constants;
import lombok.AllArgsConstructor;
/**
* 持久化策略
*
* @author tjq
* @since 2020/4/14
*/
@AllArgsConstructor
public enum StoreStrategy {
DISK("磁盘"),
MEMORY("内存");
private String des;
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.common.utils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.RemoteConstant;
import org.springframework.util.StringUtils;
/**
* AKKA 工具类
@ -21,6 +22,9 @@ public class AkkaUtils {
}
public static String getAkkaServerPath(String actorName) {
if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) {
return null;
}
return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
}

View File

@ -298,9 +298,11 @@ public class FrequentTaskTracker extends TaskTracker {
req.setSourceAddress(OhMyWorker.getWorkerAddress());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
if (StringUtils.isEmpty(serverPath)) {
return;
}
// 非可靠通知Server挂掉后任务的kill工作交由其他线程去做
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
serverActor.tell(req, null);
}

View File

@ -1,7 +1,10 @@
package com.github.kfcfans.oms.worker.core.tracker.task;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -22,12 +25,22 @@ public class TaskTrackerPool {
return instanceId2TaskTracker.get(instanceId);
}
public static void remove(Long instanceId) {
instanceId2TaskTracker.remove(instanceId);
public static TaskTracker remove(Long instanceId) {
return instanceId2TaskTracker.remove(instanceId);
}
public static void atomicCreateTaskTracker(Long instanceId, Function<Long, TaskTracker> creator) {
instanceId2TaskTracker.computeIfAbsent(instanceId, creator);
}
public static List<Long> getAllFrequentTaskTrackerKeys() {
List<Long> keys = Lists.newLinkedList();
instanceId2TaskTracker.forEach((key, tk) -> {
if (tk instanceof FrequentTaskTracker) {
keys.add(key);
}
});
return keys;
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.oms.worker.persistence;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.StoreStrategy;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
@ -19,6 +21,9 @@ public class ConnectionFactory {
private static volatile DataSource dataSource;
private static final String DISK_JDBC_URL = "jdbc:h2:file:~/.h2/oms/oms_worker_db";
private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/.h2/oms/oms_worker_db";
public static Connection getConnection() throws SQLException {
return getDataSource().getConnection();
}
@ -29,17 +34,22 @@ public class ConnectionFactory {
}
synchronized (ConnectionFactory.class) {
if (dataSource == null) {
StoreStrategy strategy = OhMyWorker.getConfig().getStoreStrategy();
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.h2.Driver");
config.setJdbcUrl("jdbc:h2:file:~/.h2/oms/oms_worker_db");
config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(2);
// 池中最大连接数量
config.setMaximumPoolSize(32);
dataSource = new HikariDataSource(config);
}
}
return dataSource;
}
}