diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml index e10749f5..ccf5e0ad 100644 --- a/oh-my-scheduler-server/pom.xml +++ b/oh-my-scheduler-server/pom.xml @@ -53,13 +53,6 @@ ${akka.version} - - - org.apache.curator - curator-recipes - ${curator.version} - - org.springframework.boot diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java index 731f5d13..d69313a4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server; import com.github.kfcfans.oms.server.core.akka.OhMyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; /** * SpringBoot 启动入口 @@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @author tjq * @since 2020/3/29 */ +@EnableScheduling @SpringBootApplication public class OhMyApplication { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/CuratorConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/CuratorConfig.java deleted file mode 100644 index c1e09a4a..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/CuratorConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.github.kfcfans.oms.server.common.config; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * ZooKeeper 连接配置 - * - * @author tjq - * @since 2020/4/4 - */ -@Configuration -public class CuratorConfig { - - @Value("${zookeeper.address}") - private String zkAddress; - - @Bean("omsCurator") - public CuratorFramework initCurator() { - CuratorFramework client = CuratorFrameworkFactory.builder() - .namespace("oms") - // zookeeper 地址,多值用 , 分割即可 - .connectString(zkAddress) - .sessionTimeoutMs(1000) - .connectionTimeoutMs(1000) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .build(); - client.start(); - return client; - } - -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java index 98c5849f..6688ef72 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java @@ -13,7 +13,7 @@ import java.util.Date; */ @Data @Entity -@Table(name = "app_info") +@Table(name = "app_info", uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})}) public class AppInfoDO { @Id diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java index 00f73962..225b9503 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java @@ -14,7 +14,7 @@ import java.util.List; public interface JobInfoRepository extends JpaRepository { - List findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); + List findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); - List findByAppIdInAndStatusAndTimeExpression(List appIds, int status, int timeExpressionType); + List findByAppIdInAndStatusAndTimeExpressionType(List appIds, int status, int timeExpressionType); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index e6debd6f..74b3ce76 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -121,7 +121,7 @@ public class ServerSelectService { downServerCache.remove(serverAddress); return response.isSuccess(); }catch (Exception e) { - log.warn("[ServerSelectService] server({}) was down, try to elect a new server.", serverAddress); + log.warn("[ServerSelectService] server({}) was down, I will be the new server.", serverAddress); } downServerCache.add(serverAddress); return false; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java index a9692233..480d878d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java @@ -74,7 +74,7 @@ public class JobScheduleService { log.error("[JobScheduleService] schedule cron job failed.", e); } log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch); - stopwatch.reset(); + stopwatch.reset().start(); try { scheduleFrequentJob(allAppIds); @@ -100,10 +100,17 @@ public class JobScheduleService { try { // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 - List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + + if (CollectionUtils.isEmpty(jobInfos)) { + log.debug("[JobScheduleService] no cron job need to schedule"); + return; + } // 1. 批量写日志表 Map jobId2InstanceId = Maps.newHashMap(); + log.info("[JobScheduleService] try to schedule some cron jobs, their jobId is {}.", jobId2InstanceId.keySet()); + List executeLogs = Lists.newLinkedList(); jobInfos.forEach(jobInfoDO -> { @@ -173,8 +180,8 @@ public class JobScheduleService { */ private void scheduleFrequentJob(List appIds) { - List fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV()); - List fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV()); + List fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV()); + List fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV()); List jobIds = Lists.newLinkedList(); Map jobId2JobInfo = Maps.newHashMap(); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/AppInfoController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/AppInfoController.java index 793714e9..ec2a14cc 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/AppInfoController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/AppInfoController.java @@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -32,6 +33,11 @@ public class AppInfoController { AppInfoDO appInfoDO = new AppInfoDO(); BeanUtils.copyProperties(appInfoRequest, appInfoDO); + Date now = new Date(); + if (appInfoRequest.getId() == null) { + appInfoDO.setGmtCreate(now); + } + appInfoDO.setGmtModified(now); appInfoRepository.saveAndFlush(appInfoDO); return ResultDTO.success(null); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index 7f0df008..50d863ea 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -37,17 +37,22 @@ public class JobController { BeanUtils.copyProperties(request, jobInfoDO); // 拷贝枚举值 - TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpression()); + TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpressionType()); jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV()); jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV()); jobInfoDO.setTimeExpressionType(timeExpressionType.getV()); // 计算下次调度时间 + Date now = new Date(); if (timeExpressionType == TimeExpressionType.CRON) { CronExpression cronExpression = new CronExpression(request.getTimeExpression()); - Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); + Date nextValidTime = cronExpression.getNextValidTimeAfter(now); jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); } + if (request.getId() == null) { + jobInfoDO.setGmtCreate(now); + } + jobInfoDO.setGmtModified(now); jobInfoRepository.saveAndFlush(jobInfoDO); return ResultDTO.success(null); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java index 42ffd3d9..92100056 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java @@ -4,6 +4,7 @@ import lombok.Data; /** * 创建/修改 JobInfo 请求 + * 测试用例快速复制区域:MAP_REDUCE、EMBEDDED_JAVA、CRON、com.github.kfcfans.oms.processors.TestMapReduceProcessor * * @author tjq * @since 2020/3/30 diff --git a/oh-my-scheduler-server/src/main/resources/application.properties b/oh-my-scheduler-server/src/main/resources/application.properties index f64e807a..09396ca1 100644 --- a/oh-my-scheduler-server/src/main/resources/application.properties +++ b/oh-my-scheduler-server/src/main/resources/application.properties @@ -11,7 +11,4 @@ spring.datasource.hikari.maximum-pool-size=20 spring.datasource.hikari.minimum-idle=5 # JPA 相关配置 spring.jpa.show-sql=true -spring.jpa.hibernate.ddl-auto=update - -# ZooKeeper(多值逗号分割) -zookeeper.address=115.159.215.229:2181 \ No newline at end of file +spring.jpa.hibernate.ddl-auto=update \ No newline at end of file diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java index d86e7aae..791ecc83 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java @@ -1,7 +1,11 @@ package com.github.kfcfans.oms.server.test; import com.github.kfcfans.common.utils.NetUtils; +import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.model.OmsLockDO; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository; import org.assertj.core.util.Lists; import org.junit.Test; @@ -22,6 +26,8 @@ import java.util.List; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class RepositoryTest { + @Resource + private JobInfoRepository jobInfoRepository; @Resource private OmsLockRepository omsLockRepository; @@ -40,4 +46,10 @@ public class RepositoryTest { omsLockRepository.flush(); } + @Test + public void testSelectCronJobSQL() { + List result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis()); + System.out.println(result); + } + } diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java index 616cf745..9d6cdc75 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java @@ -1,11 +1,13 @@ package com.github.kfcfans.oms.server.test; +import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer; import com.github.kfcfans.oms.server.common.utils.timewheel.TimerFuture; import com.github.kfcfans.oms.server.common.utils.timewheel.TimerTask; import com.google.common.collect.Lists; import org.junit.Test; +import java.util.Date; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -62,8 +64,11 @@ public class UtilsTest { } @Test - public void testCronExpression() { - + public void testCronExpression() throws Exception { + String cron = "0 * * * * ? *"; + CronExpression cronExpression = new CronExpression(cron); + final Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(new Date()); + System.out.println(nextValidTimeAfter); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java index 0618023d..54f2d212 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java @@ -5,9 +5,12 @@ import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.utils.HttpUtils; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; +import java.util.Map; + /** * 服务发现 * @@ -17,32 +20,58 @@ import org.springframework.util.StringUtils; @Slf4j public class ServerDiscoveryService { + private static final Map IP2ADDRESS = Maps.newHashMap(); private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s"; - @SuppressWarnings("rawtypes") + public static String discovery() { + if (IP2ADDRESS.isEmpty()) { + OhMyWorker.getConfig().getServerAddress().forEach(x -> IP2ADDRESS.put(x.split(":")[0], x)); + } + String result = null; + + // 先对当前机器发起请求 + String currentServer = OhMyWorker.getCurrentServer(); + if (!StringUtils.isEmpty(currentServer)) { + String ip = currentServer.split(":")[0]; + // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担 + String firstServerAddress = IP2ADDRESS.get(ip); + result = acquire(firstServerAddress); + } + for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) { - - String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer()); - try { - result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); - }catch (Exception ignore) { - } - - if (!StringUtils.isEmpty(result)) { - - ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class); - if (resultDTO.isSuccess()) { - String server = resultDTO.getData().toString(); - log.debug("[OMS-ServerDiscoveryService] current server is {}.", server); - return server; - } + if (StringUtils.isEmpty(result)) { + result = acquire(httpServerAddress); + }else { + break; } } - log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined."); - return null; + + if (StringUtils.isEmpty(result)) { + log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined."); + return null; + }else { + log.debug("[OMS-ServerDiscoveryService] current server is {}.", result); + return result; + } } + @SuppressWarnings("rawtypes") + private static String acquire(String httpServerAddress) { + String result = null; + String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer()); + try { + result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); + }catch (Exception ignore) { + } + if (!StringUtils.isEmpty(result)) { + ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class); + if (resultDTO.isSuccess()) { + return resultDTO.getData().toString(); + } + } + return null; + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java index c549c947..90d3cfa8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java @@ -9,6 +9,7 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; /** * Worker健康度定时上报 @@ -23,8 +24,13 @@ public class WorkerHealthReportRunnable implements Runnable { @Override public void run() { - SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics(); + // 没有可用Server,无法上报 + String currentServer = OhMyWorker.getCurrentServer(); + if (StringUtils.isEmpty(currentServer)) { + return; + } + SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics(); WorkerHeartbeat heartbeat = new WorkerHeartbeat(); heartbeat.setSystemMetrics(systemMetrics); @@ -34,7 +40,7 @@ public class WorkerHealthReportRunnable implements Runnable { heartbeat.setHeartbeatTime(System.currentTimeMillis()); // 发送请求 - String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath); actorSelection.tell(heartbeat, null); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java index cc773323..fbf0a9bd 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java @@ -14,18 +14,14 @@ public class AkkaUtils { /** * akka://@:/ */ - private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/user/%s"; + private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s"; - private static final String AKKA_SERVER_NODE_PATH = "akka://%s@%s/user/%s"; - - public static String getAkkaRemotePath(String ip, String actorName) { - Integer configPort = OhMyWorker.getConfig().getListeningPort(); - int port = configPort == null ? RemoteConstant.DEFAULT_CLIENT_PORT : configPort; - return String.format(AKKA_REMOTE_NODE_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, ip, port, actorName); + public static String getAkkaWorkerPath(String address, String actorName) { + return String.format(AKKA_NODE_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, address, actorName); } - public static String getAkkaServerNodePath(String actorName) { - return String.format(AKKA_SERVER_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); + public static String getAkkaServerPath(String actorName) { + return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java index 16998b0f..79366377 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java @@ -17,8 +17,8 @@ public class ProcessorTrackerStatus { private static final int DISPATCH_THRESHOLD = 20; private static final int HEARTBEAT_TIMEOUT_MS = 60000; - // 冗余存储一份 IP 地址 - private String ip; + // 冗余存储一份 address 地址 + private String address; // 上次活跃时间 private long lastActiveTime; // 等待执行任务数 @@ -31,8 +31,8 @@ public class ProcessorTrackerStatus { /** * 初始化 ProcessorTracker,此时并未持有实际的 ProcessorTracker 状态 */ - public void init(String ptIP) { - this.ip = ptIP; + public void init(String address) { + this.address = address; this.lastActiveTime = System.currentTimeMillis(); this.remainTaskNum = 0; this.dispatched = false; @@ -50,7 +50,7 @@ public class ProcessorTrackerStatus { return; } - this.ip = req.getIp(); + this.address = req.getAddress(); this.lastActiveTime = req.getTime(); this.remainTaskNum = req.getRemainTaskNum(); this.dispatched = true; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java index c252f4a6..f97a8320 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -15,27 +15,28 @@ import java.util.Map; */ public class ProcessorTrackerStatusHolder { - private final Map ip2Status; + // ProcessorTracker的address(IP:Port) -> 状态 + private final Map address2Status; public ProcessorTrackerStatusHolder(List allWorkerAddress) { - ip2Status = Maps.newConcurrentMap(); - allWorkerAddress.forEach(ip -> { + address2Status = Maps.newConcurrentMap(); + allWorkerAddress.forEach(address -> { ProcessorTrackerStatus pts = new ProcessorTrackerStatus(); - pts.init(ip); - ip2Status.put(ip, pts); + pts.init(address); + address2Status.put(address, pts); }); } - public ProcessorTrackerStatus getProcessorTrackerStatus(String ip) { - return ip2Status.get(ip); + public ProcessorTrackerStatus getProcessorTrackerStatus(String address) { + return address2Status.get(address); } /** * 根据 ProcessorTracker 的心跳更新状态 */ public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) { - ProcessorTrackerStatus processorTrackerStatus = ip2Status.get(heartbeatReq.getIp()); + ProcessorTrackerStatus processorTrackerStatus = address2Status.get(heartbeatReq.getAddress()); processorTrackerStatus.update(heartbeatReq); } @@ -45,9 +46,9 @@ public class ProcessorTrackerStatusHolder { public List getAvailableProcessorTrackers() { List result = Lists.newLinkedList(); - ip2Status.forEach((ip, ptStatus) -> { + address2Status.forEach((address, ptStatus) -> { if (ptStatus.available()) { - result.add(ip); + result.add(address); } }); return result; @@ -57,7 +58,7 @@ public class ProcessorTrackerStatusHolder { * 获取所有 ProcessorTracker 的IP地址(包括不可用状态) */ public List getAllProcessorTrackers() { - return Lists.newArrayList(ip2Status.keySet()); + return Lists.newArrayList(address2Status.keySet()); } /** @@ -66,7 +67,7 @@ public class ProcessorTrackerStatusHolder { public List getAllDisconnectedProcessorTrackers() { List result = Lists.newLinkedList(); - ip2Status.forEach((ip, ptStatus) -> { + address2Status.forEach((ip, ptStatus) -> { if (ptStatus.isTimeout()) { result.add(ip); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 4e25c7c2..f37f24c2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -53,7 +53,7 @@ public class ProcessorTracker { this.instanceInfo = request.getInstanceInfo(); this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); // 初始化 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index b35e56f6..b2ce67c5 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -198,7 +198,7 @@ public class TaskTracker { rootTask.setInstanceId(instanceInfo.getInstanceId()); rootTask.setTaskId(TaskConstant.ROOT_TASK_ID); rootTask.setFailedCnt(0); - rootTask.setAddress(NetUtils.getLocalHost()); + rootTask.setAddress(OhMyWorker.getWorkerAddress()); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setLastModifiedTime(System.currentTimeMillis()); @@ -221,7 +221,7 @@ public class TaskTracker { TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { - String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 ptActor.tell(stopRequest, null); @@ -289,7 +289,7 @@ public class TaskTracker { if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } - String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + String ptActorPath = AkkaUtils.getAkkaWorkerPath(ptAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); ptActor.tell(startTaskReq, null); @@ -380,7 +380,7 @@ public class TaskTracker { TaskDO newLastTask = new TaskDO(); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskId(TaskConstant.LAST_TASK_ID); - newLastTask.setAddress(NetUtils.getLocalHost()); + newLastTask.setAddress(OhMyWorker.getWorkerAddress()); addTask(Lists.newArrayList(newLastTask)); } @@ -388,7 +388,7 @@ public class TaskTracker { finished.set(finishedBoolean); } - String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); // 3. 执行完毕,报告服务器 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java index 3b6d2560..036ae699 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java @@ -25,7 +25,7 @@ public class TaskDO { private String taskName; // 任务对象(序列化后的二进制数据) private byte[] taskContent; - // 对于JobTracker为workerAddress,对于普通Worker为jobTrackerAddress + // 对于TaskTracker为workerAddress(派发地址),对于普通Worker为TaskTrackerAddress(汇报地址) private String address; // 任务状态,0~10代表 JobTracker 使用,11~20代表普通Worker使用 private Integer status; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java index b0d4c41f..50f95549 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.worker.pojo.request; import com.github.kfcfans.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.OhMyWorker; import lombok.Data; import lombok.NoArgsConstructor; @@ -29,13 +30,13 @@ public class ProcessorTrackerStatusReportReq { /** * 本机地址 */ - private String ip; + private String address; public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) { this.instanceId = instanceId; this.remainTaskNum = remainTaskNum; this.time = System.currentTimeMillis(); - this.ip = NetUtils.getLocalHost(); + this.address = OhMyWorker.getWorkerAddress(); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java index f6b00dc6..3d26c306 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.worker.pojo.request; import com.github.kfcfans.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import lombok.Getter; @@ -31,9 +32,12 @@ public class TaskTrackerStartTaskReq implements Serializable { private int taskCurrentRetryNums; + /** + * 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用 + */ public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) { - this.taskTrackerAddress = NetUtils.getLocalHost(); + this.taskTrackerAddress = OhMyWorker.getWorkerAddress(); this.instanceInfo = instanceInfo; this.taskId = task.getTaskId(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java index e8a64c6e..9eff46ac 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java @@ -57,7 +57,7 @@ public abstract class MapReduceProcessor implements BasicProcessor { // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) boolean requestSucceed = false; try { - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); CompletionStage requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java index 6a51843b..1eae22eb 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java @@ -36,7 +36,7 @@ public class ProcessorTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); remoteProcessorTracker = testAS.actorSelection(akkaRemotePath); } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java index cca7d8a2..3302171c 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java @@ -36,21 +36,26 @@ public class TaskTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":25520", RemoteConstant.Task_TRACKER_ACTOR_NAME); remoteTaskTracker = testAS.actorSelection(akkaRemotePath); } + @Test + public void justStartWorkerToTestServer() throws Exception { + Thread.sleep(277277277); + } + @Test public void testStandaloneJob() throws Exception { remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null); - Thread.sleep(500000); + Thread.sleep(5000000); } @Test public void testMapReduceJob() throws Exception { remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null); - Thread.sleep(500000); + Thread.sleep(5000000); } private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) { @@ -58,7 +63,7 @@ public class TaskTrackerTest { req.setJobId(1L); req.setInstanceId(10086L); - req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost())); + req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":25520")); req.setJobParams("this is job Params"); req.setInstanceParams("this is instance Params");