diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java index 51fadc92..1e08fb8c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java @@ -56,4 +56,8 @@ public interface InstanceLogRepository extends JpaRepository findByJobId(long jobId, Pageable pageable); // 只会有一条数据,只是为了统一 Page findByInstanceId(long instanceId, Pageable pageable); + + // 数据统计 + long countByAppIdAndStatus(long appId, int status); + long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java index 6e21cad4..48d0f662 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java @@ -25,6 +25,5 @@ public interface JobInfoRepository extends JpaRepository { long countByAppId(long appId); - long countByAppIdAndStatus(long appId, int status); - long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java index b7f6e285..84cf5a28 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ControllerExceptionHandler.java @@ -19,7 +19,13 @@ public class ControllerExceptionHandler { @ResponseBody @ExceptionHandler(Exception.class) public ResultDTO exceptionHandler(Exception e) { - log.error("[ControllerException] http request failed.", e); + + // 不是所有异常都需要打印完整堆栈,后续可以定义内部的Exception,便于判断 + if (e instanceof IllegalArgumentException) { + log.warn("[ControllerException] http request failed, message is {}.", e.getMessage()); + }else { + log.error("[ControllerException] http request failed.", e); + } return ResultDTO.failed(e.getMessage()); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index d5cfd5e6..44272525 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -10,8 +10,6 @@ import com.github.kfcfans.oms.server.service.CacheService; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest; import com.github.kfcfans.oms.server.web.response.InstanceLogVO; -import com.github.kfcfans.oms.server.web.response.JobInfoVO; -import com.google.common.collect.Lists; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Page; @@ -86,7 +84,11 @@ public class InstanceController { instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId())); // 格式化时间 - instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN)); + if (instanceLogDO.getActualTriggerTime() == null) { + instanceLogVO.setActualTriggerTime("N/A"); + }else { + instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN)); + } if (instanceLogDO.getFinishedTime() == null) { instanceLogVO.setFinishedTime("N/A"); }else { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java index 0e933486..5a383657 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java @@ -11,6 +11,7 @@ import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.web.response.SystemOverviewVO; import com.github.kfcfans.oms.server.web.response.WorkerStatusVO; @@ -41,6 +42,8 @@ public class SystemInfoController { private AppInfoRepository appInfoRepository; @Resource private JobInfoRepository jobInfoRepository; + @Resource + private InstanceLogRepository instanceLogRepository; @GetMapping("/listWorker") @SuppressWarnings("unchecked") @@ -86,10 +89,10 @@ public class SystemInfoController { // 总任务数量 overview.setJobCount(jobInfoRepository.countByAppId(appId)); // 运行任务数 - overview.setRunningInstanceCount(jobInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); + overview.setRunningInstanceCount(instanceLogRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); // 近期失败任务数(24H内) Date date = DateUtils.addDays(new Date(), -1); - overview.setFailedInstanceCount(jobInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date)); + overview.setFailedInstanceCount(instanceLogRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date)); return ResultDTO.success(overview); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 05961bcb..2e39804a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -8,7 +8,7 @@ import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; import com.github.kfcfans.oms.worker.background.ServerDiscoveryService; -import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable; +import com.github.kfcfans.oms.worker.background.WorkerHealthReporter; import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.utils.NetUtils; @@ -106,7 +106,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { // 初始化定时任务 ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory); - timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 15, TimeUnit.SECONDS); + timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS); log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java index 54f2d212..390cdd06 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java @@ -5,11 +5,17 @@ import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.utils.HttpUtils; +import com.github.kfcfans.oms.worker.core.tracker.task.FrequentTaskTracker; +import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; +import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * 服务发现 @@ -20,8 +26,14 @@ import java.util.Map; @Slf4j public class ServerDiscoveryService { + // 配置的可发起HTTP请求的Server(IP:Port) private static final Map IP2ADDRESS = Maps.newHashMap(); + // 服务发现地址 private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s"; + // 失败次数 + private static int FAILED_COUNT = 0; + // 最大失败次数 + private static final int MAX_FAILED_COUNT = 3; public static String discovery() { @@ -50,9 +62,27 @@ public class ServerDiscoveryService { } if (StringUtils.isEmpty(result)) { - log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined."); + log.warn("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined."); + + // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务 + if (FAILED_COUNT++ > MAX_FAILED_COUNT) { + + log.error("[OMS-ServerDiscoveryService] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker."); + List frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys(); + if (!CollectionUtils.isEmpty(frequentInstanceIds)) { + frequentInstanceIds.forEach(instanceId -> { + TaskTracker taskTracker = TaskTrackerPool.remove(instanceId); + taskTracker.destroy(); + log.warn("[OMS-ServerDiscoveryService] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId); + }); + } + + FAILED_COUNT = 0; + } return null; }else { + // 重置失败次数 + FAILED_COUNT = 0; log.debug("[OMS-ServerDiscoveryService] current server is {}.", result); return result; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReporter.java similarity index 92% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReporter.java index 90d3cfa8..293b41d3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReporter.java @@ -19,7 +19,7 @@ import org.springframework.util.StringUtils; */ @Slf4j @AllArgsConstructor -public class WorkerHealthReportRunnable implements Runnable { +public class WorkerHealthReporter implements Runnable { @Override public void run() { @@ -41,6 +41,9 @@ public class WorkerHealthReportRunnable implements Runnable { // 发送请求 String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + if (StringUtils.isEmpty(serverPath)) { + return; + } ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath); actorSelection.tell(heartbeat, null); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java index d2b76b64..23392816 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker.common; +import com.github.kfcfans.oms.worker.common.constants.StoreStrategy; import lombok.Data; import java.util.List; @@ -20,4 +21,8 @@ public class OhMyConfig { * 调度服务器地址,ip:port */ private List serverAddress; + /** + * 本地持久化方式,默认使用磁盘 + */ + private StoreStrategy storeStrategy = StoreStrategy.DISK; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/StoreStrategy.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/StoreStrategy.java new file mode 100644 index 00000000..84bfe49d --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/StoreStrategy.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.oms.worker.common.constants; + +import lombok.AllArgsConstructor; + +/** + * 持久化策略 + * + * @author tjq + * @since 2020/4/14 + */ +@AllArgsConstructor +public enum StoreStrategy { + + DISK("磁盘"), + MEMORY("内存"); + + private String des; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java index 5cb38c21..b2e405c7 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.common.utils; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.common.RemoteConstant; +import org.springframework.util.StringUtils; /** * AKKA 工具类 @@ -21,6 +22,9 @@ public class AkkaUtils { } public static String getAkkaServerPath(String actorName) { + if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) { + return null; + } return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index c65c79e8..b6d01959 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -298,9 +298,11 @@ public class FrequentTaskTracker extends TaskTracker { req.setSourceAddress(OhMyWorker.getWorkerAddress()); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); - + if (StringUtils.isEmpty(serverPath)) { + return; + } // 非可靠通知,Server挂掉后任务的kill工作交由其他线程去做 + ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); serverActor.tell(req, null); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java index bc397be9..fbb4433d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java @@ -1,7 +1,10 @@ package com.github.kfcfans.oms.worker.core.tracker.task; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -22,12 +25,22 @@ public class TaskTrackerPool { return instanceId2TaskTracker.get(instanceId); } - public static void remove(Long instanceId) { - instanceId2TaskTracker.remove(instanceId); + public static TaskTracker remove(Long instanceId) { + return instanceId2TaskTracker.remove(instanceId); } public static void atomicCreateTaskTracker(Long instanceId, Function creator) { instanceId2TaskTracker.computeIfAbsent(instanceId, creator); } + public static List getAllFrequentTaskTrackerKeys() { + List keys = Lists.newLinkedList(); + instanceId2TaskTracker.forEach((key, tk) -> { + if (tk instanceof FrequentTaskTracker) { + keys.add(key); + } + }); + return keys; + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java index e9684d71..c608caee 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java @@ -1,5 +1,7 @@ package com.github.kfcfans.oms.worker.persistence; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.constants.StoreStrategy; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; @@ -19,6 +21,9 @@ public class ConnectionFactory { private static volatile DataSource dataSource; + private static final String DISK_JDBC_URL = "jdbc:h2:file:~/.h2/oms/oms_worker_db"; + private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/.h2/oms/oms_worker_db"; + public static Connection getConnection() throws SQLException { return getDataSource().getConnection(); } @@ -29,17 +34,22 @@ public class ConnectionFactory { } synchronized (ConnectionFactory.class) { if (dataSource == null) { + + StoreStrategy strategy = OhMyWorker.getConfig().getStoreStrategy(); + HikariConfig config = new HikariConfig(); config.setDriverClassName("org.h2.Driver"); - config.setJdbcUrl("jdbc:h2:file:~/.h2/oms/oms_worker_db"); + config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL); config.setAutoCommit(true); // 池中最小空闲连接数量 config.setMinimumIdle(2); // 池中最大连接数量 config.setMaximumPoolSize(32); dataSource = new HikariDataSource(config); + } } return dataSource; } + }