diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 7fe1fe1f..527cb7d7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -21,6 +21,8 @@ import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService; import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter; import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.PowerBannerPrinter; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; +import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils; import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.google.common.base.Stopwatch; @@ -28,14 +30,12 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import org.springframework.util.StringUtils; import java.util.Map; import java.util.Objects; @@ -53,17 +53,8 @@ import java.util.concurrent.TimeUnit; @Slf4j public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean { - @Getter - private static OhMyConfig config; - @Getter - private static String currentServer; - @Getter - private static String workerAddress; - - public static ActorSystem actorSystem; - @Getter - private static Long appId; - private static ScheduledExecutorService timingPool; + private ScheduledExecutorService timingPool; + private final RuntimeMeta runtimeMeta = new RuntimeMeta(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { @@ -79,33 +70,54 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di Stopwatch stopwatch = Stopwatch.createStarted(); log.info("[OhMyWorker] start to initialize OhMyWorker..."); + + OhMyConfig config = runtimeMeta.getOhMyConfig(); + CommonUtils.requireNonNull(config, "can't find OhMyConfig, please set OhMyConfig first"); + try { PowerBannerPrinter.print(); // 校验 appName if (!config.isEnableTestMode()) { - appId = assertAppName(); + assertAppName(); }else { log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env."); } + // 初始化文件系统 + OmsWorkerFileUtils.init(runtimeMeta.getOhMyConfig()); + + // 初始化元数据 + String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort(); + runtimeMeta.setWorkerAddress(workerAddress); + + // 初始化定时线程池 + ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); + timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory); + + // 连接 server + ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(runtimeMeta.getAppId(), runtimeMeta.getOhMyConfig()); + serverDiscoveryService.start(timingPool); + runtimeMeta.setServerDiscoveryService(serverDiscoveryService); + // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了) Map overrideConfig = Maps.newHashMap(); overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost()); overrideConfig.put("akka.remote.artery.canonical.port", config.getPort()); - workerAddress = NetUtils.getLocalHost() + ":" + config.getPort(); Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); int cores = Runtime.getRuntime().availableProcessors(); - actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class) + ActorSystem actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); + runtimeMeta.setActorSystem(actorSystem); + + ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(runtimeMeta) .withDispatcher("akka.task-tracker-dispatcher") .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class) + actorSystem.actorOf(ProcessorTrackerActor.props(runtimeMeta) .withDispatcher("akka.processor-tracker-dispatcher") .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(WorkerActor.class) + actorSystem.actorOf(WorkerActor.props(taskTrackerActorRef) .withDispatcher("akka.worker-common-dispatcher") .withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME); @@ -116,23 +128,19 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); + // 初始化日志系统 + OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService); + runtimeMeta.setOmsLogHandler(omsLogHandler); + // 初始化存储 - TaskPersistenceService.INSTANCE.init(); + TaskPersistenceService taskPersistenceService = new TaskPersistenceService(runtimeMeta.getOhMyConfig().getStoreStrategy()); + taskPersistenceService.init(); + runtimeMeta.setTaskPersistenceService(taskPersistenceService); log.info("[OhMyWorker] local storage initialized successfully."); - // 服务发现 - currentServer = ServerDiscoveryService.discovery(); - if (StringUtils.isEmpty(currentServer) && !config.isEnableTestMode()) { - throw new RuntimeException("can't find any available server, this worker has been quarantined."); - } - log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer); - // 初始化定时任务 - ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); - timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory); - timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS); - timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS); - timingPool.scheduleWithFixedDelay(OmsLogHandler.INSTANCE.logSubmitter, 0, 5, TimeUnit.SECONDS); + timingPool.scheduleAtFixedRate(new WorkerHealthReporter(runtimeMeta), 0, 15, TimeUnit.SECONDS); + timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS); log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); }catch (Exception e) { @@ -142,12 +150,13 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di } public void setConfig(OhMyConfig config) { - OhMyWorker.config = config; + runtimeMeta.setOhMyConfig(config); } @SuppressWarnings("rawtypes") - private Long assertAppName() { + private void assertAppName() { + OhMyConfig config = runtimeMeta.getOhMyConfig(); String appName = config.getAppName(); Objects.requireNonNull(appName, "appName can't be empty!"); @@ -160,7 +169,8 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di if (resultDTO.isSuccess()) { Long appId = Long.valueOf(resultDTO.getData().toString()); log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId); - return appId; + runtimeMeta.setAppId(appId); + return; }else { log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); throw new PowerJobException(resultDTO.getMessage()); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java index e30779cb..53f55a28 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java @@ -1,11 +1,14 @@ package com.github.kfcfans.powerjob.worker.actors; import akka.actor.AbstractActor; +import akka.actor.Props; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTrackerPool; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; @@ -18,8 +21,15 @@ import java.util.List; * @since 2020/3/17 */ @Slf4j +@AllArgsConstructor public class ProcessorTrackerActor extends AbstractActor { + private final RuntimeMeta runtimeMeta; + + public static Props props(RuntimeMeta runtimeMeta) { + return Props.create(ProcessorTrackerActor.class, () -> new ProcessorTrackerActor(runtimeMeta)); + } + @Override public Receive createReceive() { return receiveBuilder() @@ -38,7 +48,10 @@ public class ProcessorTrackerActor extends AbstractActor { Long instanceId = req.getInstanceInfo().getInstanceId(); // 创建 ProcessorTracker 一定能成功 - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req)); + ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker( + instanceId, + req.getTaskTrackerAddress(), + () -> new ProcessorTracker(req, runtimeMeta)); TaskDO task = new TaskDO(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index 58893603..8b5c9bb6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -1,10 +1,12 @@ package com.github.kfcfans.powerjob.worker.actors; import akka.actor.AbstractActor; +import akka.actor.Props; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; @@ -14,6 +16,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -25,8 +28,15 @@ import java.util.List; * @since 2020/3/17 */ @Slf4j +@AllArgsConstructor public class TaskTrackerActor extends AbstractActor { + private final RuntimeMeta runtimeMeta; + + public static Props props(RuntimeMeta runtimeMeta) { + return Props.create(TaskTrackerActor.class, () -> new TaskTrackerActor(runtimeMeta)); + } + @Override public Receive createReceive() { return receiveBuilder() @@ -123,7 +133,7 @@ public class TaskTrackerActor extends AbstractActor { log.debug("[TaskTrackerActor] server schedule job by request: {}.", req); // 原子创建,防止多实例的存在 - TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req)); + TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req, runtimeMeta)); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java index c10aed00..3a44e555 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java @@ -1,9 +1,13 @@ package com.github.kfcfans.powerjob.worker.actors; import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest; import com.github.kfcfans.powerjob.common.request.ServerDestroyContainerRequest; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; /** @@ -13,8 +17,15 @@ import lombok.extern.slf4j.Slf4j; * @since 2020/3/24 */ @Slf4j +@AllArgsConstructor public class WorkerActor extends AbstractActor { + private final ActorRef taskTrackerActorRef; + + public static Props props(ActorRef taskTrackerActorRef) { + return Props.create(WorkerActor.class, () -> new WorkerActor(taskTrackerActorRef)); + } + @Override public Receive createReceive() { return receiveBuilder() diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java index 40aed030..c58c16e2 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java @@ -1,11 +1,11 @@ package com.github.kfcfans.powerjob.worker.background; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.model.InstanceLogContent; import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; -import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.google.common.collect.Lists; import com.google.common.collect.Queues; @@ -27,23 +27,28 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j public class OmsLogHandler { - private OmsLogHandler() { - } + private final String workerAddress; + private final ActorSystem actorSystem; + private final ServerDiscoveryService serverDiscoveryService; - // 单例 - public static final OmsLogHandler INSTANCE = new OmsLogHandler(); - // 生产者消费者模式,异步上传日志 - private final BlockingQueue logQueue = Queues.newLinkedBlockingQueue(); // 处理线程,需要通过线程池启动 public final Runnable logSubmitter = new LogSubmitter(); // 上报锁,只需要一个线程上报即可 private final Lock reportLock = new ReentrantLock(); + // 生产者消费者模式,异步上传日志 + private final BlockingQueue logQueue = Queues.newLinkedBlockingQueue(); // 每次上报携带的数据条数 private static final int BATCH_SIZE = 20; // 本地囤积阈值 private static final int REPORT_SIZE = 1024; + public OmsLogHandler(String workerAddress, ActorSystem actorSystem, ServerDiscoveryService serverDiscoveryService) { + this.workerAddress = workerAddress; + this.actorSystem = actorSystem; + this.serverDiscoveryService = serverDiscoveryService; + } + /** * 提交日志 * @param instanceId 任务实例ID @@ -74,7 +79,7 @@ public class OmsLogHandler { try { - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getServerActorPath(serverDiscoveryService.getCurrentServerAddress()); // 当前无可用 Server if (StringUtils.isEmpty(serverPath)) { if (!logQueue.isEmpty()) { @@ -84,7 +89,7 @@ public class OmsLogHandler { return; } - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + ActorSelection serverActor = actorSystem.actorSelection(serverPath); List logs = Lists.newLinkedList(); while (!logQueue.isEmpty()) { @@ -93,7 +98,7 @@ public class OmsLogHandler { logs.add(logContent); if (logs.size() >= BATCH_SIZE) { - WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), Lists.newLinkedList(logs)); + WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs)); // 不可靠请求,WEB日志不追求极致 serverActor.tell(req, null); logs.clear(); @@ -105,7 +110,7 @@ public class OmsLogHandler { } if (!logs.isEmpty()) { - WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); + WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs); serverActor.tell(req, null); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java index 8dd22416..35c01ff2 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/ServerDiscoveryService.java @@ -1,19 +1,22 @@ package com.github.kfcfans.powerjob.worker.background; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; -import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.common.utils.HttpUtils; +import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * 服务发现 @@ -24,8 +27,13 @@ import java.util.Map; @Slf4j public class ServerDiscoveryService { - // 配置的可发起HTTP请求的Server(IP:Port) - private static final Map IP2ADDRESS = Maps.newHashMap(); + private final Long appId; + private final OhMyConfig config; + + private String currentServerAddress; + + private final Map ip2Address = Maps.newHashMap(); + // 服务发现地址 private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s&protocol=AKKA"; // 失败次数 @@ -33,27 +41,44 @@ public class ServerDiscoveryService { // 最大失败次数 private static final int MAX_FAILED_COUNT = 3; + public ServerDiscoveryService(Long appId, OhMyConfig config) { + this.appId = appId; + this.config = config; + } - public static String discovery() { + public void start(ScheduledExecutorService timingPool) { + this.currentServerAddress = discovery(); + if (org.springframework.util.StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) { + throw new PowerJobException("can't find any available server, this worker has been quarantined."); + } + timingPool.scheduleAtFixedRate(() -> this.currentServerAddress = discovery(), 10, 10, TimeUnit.SECONDS); + } - if (IP2ADDRESS.isEmpty()) { - OhMyWorker.getConfig().getServerAddress().forEach(x -> IP2ADDRESS.put(x.split(":")[0], x)); + public String getCurrentServerAddress() { + return currentServerAddress; + } + + + private String discovery() { + + if (ip2Address.isEmpty()) { + config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x)); } String result = null; // 先对当前机器发起请求 - String currentServer = OhMyWorker.getCurrentServer(); + String currentServer = currentServerAddress; if (!StringUtils.isEmpty(currentServer)) { String ip = currentServer.split(":")[0]; // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担 - String firstServerAddress = IP2ADDRESS.get(ip); + String firstServerAddress = ip2Address.get(ip); if (firstServerAddress != null) { result = acquire(firstServerAddress); } } - for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) { + for (String httpServerAddress : config.getServerAddress()) { if (StringUtils.isEmpty(result)) { result = acquire(httpServerAddress); }else { @@ -62,36 +87,36 @@ public class ServerDiscoveryService { } if (StringUtils.isEmpty(result)) { - log.warn("[OmsServerDiscovery] can't find any available server, this worker has been quarantined."); + log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined."); // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务 if (FAILED_COUNT++ > MAX_FAILED_COUNT) { - log.warn("[OmsServerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker."); + log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker."); List frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys(); if (!CollectionUtils.isEmpty(frequentInstanceIds)) { frequentInstanceIds.forEach(instanceId -> { TaskTracker taskTracker = TaskTrackerPool.remove(instanceId); taskTracker.destroy(); - log.warn("[OmsServerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId); + log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId); }); } FAILED_COUNT = 0; } return null; - }else { + } else { // 重置失败次数 FAILED_COUNT = 0; - log.debug("[OmsServerDiscovery] current server is {}.", result); + log.debug("[PowerDiscovery] current server is {}.", result); return result; } } @SuppressWarnings("rawtypes") - private static String acquire(String httpServerAddress) { + private String acquire(String httpServerAddress) { String result = null; - String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer()); + String url = String.format(DISCOVERY_URL, httpServerAddress, appId, currentServerAddress); try { result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); }catch (Exception ignore) { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java index d50f6e75..1e361d97 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java @@ -5,8 +5,8 @@ import com.github.kfcfans.powerjob.common.Protocol; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; -import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.PowerJobWorkerVersion; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; @@ -24,29 +24,31 @@ import org.springframework.util.StringUtils; @AllArgsConstructor public class WorkerHealthReporter implements Runnable { + private final RuntimeMeta runtimeMeta; + @Override public void run() { // 没有可用Server,无法上报 - String currentServer = OhMyWorker.getCurrentServer(); + String currentServer = runtimeMeta.getServerDiscoveryService().getCurrentServerAddress(); if (StringUtils.isEmpty(currentServer)) { return; } SystemMetrics systemMetrics; - if (OhMyWorker.getConfig().getSystemMetricsCollector() == null) { + if (runtimeMeta.getOhMyConfig().getSystemMetricsCollector() == null) { systemMetrics = SystemInfoUtils.getSystemMetrics(); } else { - systemMetrics = OhMyWorker.getConfig().getSystemMetricsCollector().collect(); + systemMetrics = runtimeMeta.getOhMyConfig().getSystemMetricsCollector().collect(); } WorkerHeartbeat heartbeat = new WorkerHeartbeat(); heartbeat.setSystemMetrics(systemMetrics); - heartbeat.setWorkerAddress(OhMyWorker.getWorkerAddress()); - heartbeat.setAppName(OhMyWorker.getConfig().getAppName()); - heartbeat.setAppId(OhMyWorker.getAppId()); + heartbeat.setWorkerAddress(runtimeMeta.getWorkerAddress()); + heartbeat.setAppName(runtimeMeta.getOhMyConfig().getAppName()); + heartbeat.setAppId(runtimeMeta.getAppId()); heartbeat.setHeartbeatTime(System.currentTimeMillis()); heartbeat.setVersion(PowerJobWorkerVersion.getVersion()); heartbeat.setProtocol(Protocol.AKKA.name()); @@ -56,11 +58,11 @@ public class WorkerHealthReporter implements Runnable { heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos()); // 发送请求 - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getServerActorPath(currentServer); if (StringUtils.isEmpty(serverPath)) { return; } - ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath); + ActorSelection actorSelection = runtimeMeta.getActorSystem().actorSelection(serverPath); actorSelection.tell(heartbeat, null); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/RuntimeMeta.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/RuntimeMeta.java new file mode 100644 index 00000000..962740b3 --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/RuntimeMeta.java @@ -0,0 +1,28 @@ +package com.github.kfcfans.powerjob.worker.common; + +import akka.actor.ActorSystem; +import com.github.kfcfans.powerjob.worker.background.OmsLogHandler; +import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService; +import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; +import lombok.Data; + +/** + * store worker's runtime meta info + * + * @author tjq + * @since 2021/3/7 + */ +@Data +public class RuntimeMeta { + + private Long appId; + + private OhMyConfig ohMyConfig; + + private String workerAddress; + + private ActorSystem actorSystem; + private OmsLogHandler omsLogHandler; + private ServerDiscoveryService serverDiscoveryService; + private TaskPersistenceService taskPersistenceService; +} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/ThreadLocalStore.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/ThreadLocalStore.java index 73038ea1..35e3ad7f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/ThreadLocalStore.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/ThreadLocalStore.java @@ -15,6 +15,8 @@ public class ThreadLocalStore { private static final ThreadLocal TASK_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal RUNTIME_META_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal TASK_ID_THREAD_LOCAL = new ThreadLocal<>(); @@ -26,6 +28,14 @@ public class ThreadLocalStore { TASK_THREAD_LOCAL.set(task); } + public static RuntimeMeta getRuntimeMeta() { + return RUNTIME_META_LOCAL.get(); + } + + public static void setRuntimeMeta(RuntimeMeta runtimeMeta) { + RUNTIME_META_LOCAL.set(runtimeMeta); + } + public static AtomicLong getTaskIDAddr() { if (TASK_ID_THREAD_LOCAL.get() == null) { TASK_ID_THREAD_LOCAL.set(new AtomicLong(0)); @@ -35,6 +45,7 @@ public class ThreadLocalStore { public static void clear() { TASK_ID_THREAD_LOCAL.remove(); + RUNTIME_META_LOCAL.remove(); TASK_THREAD_LOCAL.remove(); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java index efb465df..b27e3f48 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java @@ -31,11 +31,11 @@ public class AkkaUtils { return String.format(AKKA_NODE_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, actorName); } - public static String getAkkaServerPath(String actorName) { - if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) { + public static String getServerActorPath(String serverAddress) { + if (StringUtils.isEmpty(serverAddress)) { return null; } - return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); + return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, serverAddress, RemoteConstant.SERVER_ACTOR_NAME); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/OmsWorkerFileUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/OmsWorkerFileUtils.java index 668238f5..a1cee721 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/OmsWorkerFileUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/OmsWorkerFileUtils.java @@ -2,6 +2,8 @@ package com.github.kfcfans.powerjob.worker.common.utils; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.OhMyConfig; +import lombok.extern.slf4j.Slf4j; /** * 文件工具类 @@ -9,20 +11,26 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker; * @author tjq * @since 2020/5/16 */ +@Slf4j public class OmsWorkerFileUtils { - private static final String USER_HOME = System.getProperty("user.home", "powerjob"); - private static final String WORKER_DIR = USER_HOME + "/powerjob/" + OhMyWorker.getConfig().getAppName() + "/"; + private static String basePath; + + public static void init(OhMyConfig config) { + String userHome = System.getProperty("user.home", "powerjob"); + basePath = userHome + "/powerjob/" + config.getAppName() + "/"; + log.info("[PowerFile] use base file path: {}", basePath); + } public static String getScriptDir() { - return WORKER_DIR + "script/"; + return basePath + "script/"; } public static String getContainerDir() { - return WORKER_DIR + "container/"; + return basePath + "container/"; } public static String getH2WorkDir() { - return WORKER_DIR + "h2/" + CommonUtils.genUUID() + "/"; + return basePath + "h2/" + CommonUtils.genUUID() + "/"; } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java index 24a9468e..d3cfba0f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java @@ -18,16 +18,15 @@ public class WorkflowContextUtils { } - public static boolean isExceededLengthLimit(Map appendedWfContext) { + public static boolean isExceededLengthLimit(Map appendedWfContext, int maxLength) { String jsonString = JsonUtils.toJSONString(appendedWfContext); if (jsonString == null) { // impossible return true; } - int maxAppendedWfContextLength = OhMyWorker.getConfig().getMaxAppendedWfContextLength(); - return maxAppendedWfContextLength < jsonString.length(); + return maxLength < jsonString.length(); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java index a8640054..ecace797 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java @@ -39,17 +39,17 @@ public class OmsContainerFactory { /** * 获取容器 * @param containerId 容器ID - * @param loadFromServer 当本地不存在时尝试从 server 加载 + * @param serverActor 当容器不存在且 serverActor 非空时,尝试从服务端重新拉取容器 * @return 容器示例,可能为 null */ - public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) { + public static OmsContainer fetchContainer(Long containerId, ActorSelection serverActor) { OmsContainer omsContainer = CARGO.get(containerId); if (omsContainer != null) { return omsContainer; } - if (!loadFromServer) { + if (serverActor == null) { return null; } @@ -57,11 +57,6 @@ public class OmsContainerFactory { log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId); WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId); - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); - if (StringUtils.isEmpty(serverPath)) { - return null; - } - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); try { CompletionStage askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java index 8efe4adf..20c710bc 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java @@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer { // 需要满足的条件:引用计数器减为0 & 有更新的容器出现 if (referenceCount.decrementAndGet() <= 0) { - OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false); + OmsContainer container = OmsContainerFactory.fetchContainer(containerId, null); if (container != this) { try { destroy(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index 0b7dfd84..bdfba539 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core.executor; import akka.actor.ActorSelection; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; @@ -57,6 +58,7 @@ public class ProcessorRunnable implements Runnable { * 重试队列,ProcessorTracker 将会定期重新上报处理结果 */ private final Queue statusReportRetryQueue; + private final RuntimeMeta runtimeMeta; public void innerRun() throws InterruptedException { @@ -65,6 +67,8 @@ public class ProcessorRunnable implements Runnable { log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); ThreadLocalStore.setTask(task); + ThreadLocalStore.setRuntimeMeta(runtimeMeta); + // 0. 构造任务上下文 WorkflowContext workflowContext = constructWorkflowContext(); TaskContext taskContext = constructTaskContext(); @@ -113,7 +117,7 @@ public class ProcessorRunnable implements Runnable { if (task.getTaskContent() != null && task.getTaskContent().length > 0) { taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); } - taskContext.setUserContext(OhMyWorker.getConfig().getUserContext()); + taskContext.setUserContext(runtimeMeta.getOhMyConfig().getUserContext()); return taskContext; } @@ -131,7 +135,7 @@ public class ProcessorRunnable implements Runnable { Stopwatch stopwatch = Stopwatch.createStarted(); log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - List taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId()); + List taskResults = runtimeMeta.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId()); try { switch (executeType) { case BROADCAST: @@ -209,8 +213,8 @@ public class ProcessorRunnable implements Runnable { req.setReportTime(System.currentTimeMillis()); req.setCmd(cmd); // 检查追加的上下文大小是否超出限制 - if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) { - log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength()); + if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength())) { + log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength()); // ignore appended workflow context data appendedWfContext = Collections.emptyMap(); } @@ -254,7 +258,7 @@ public class ProcessorRunnable implements Runnable { if (StringUtils.isEmpty(result)) { return ""; } - final int maxLength = OhMyWorker.getConfig().getMaxResultLength(); + final int maxLength = runtimeMeta.getOhMyConfig().getMaxResultLength(); if (result.length() <= maxLength) { return result; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java index 6caa6d01..0ed81d0b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.core.processor.sdk; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; @@ -47,13 +48,14 @@ public abstract class MapProcessor implements BasicProcessor { } TaskDO task = ThreadLocalStore.getTask(); + RuntimeMeta runtimeMeta = ThreadLocalStore.getRuntimeMeta(); // 1. 构造请求 ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); - boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req); + boolean requestSucceed = AkkaUtils.reliableTransmit(runtimeMeta.getActorSystem().actorSelection(akkaRemotePath), req); if (requestSucceed) { return new ProcessResult(true, "MAP_SUCCESS"); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 95258ecc..07779815 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.worker.core.tracker.processor; import akka.actor.ActorSelection; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.utils.CommonUtils; -import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils; @@ -42,6 +42,7 @@ public class ProcessorTracker { * 记录创建时间 */ private long startTime; + private RuntimeMeta runtimeMeta; /** * 任务实例信息 */ @@ -99,17 +100,18 @@ public class ProcessorTracker { * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) */ @SuppressWarnings("squid:S1181") - public ProcessorTracker(TaskTrackerStartTaskReq request) { + public ProcessorTracker(TaskTrackerStartTaskReq request, RuntimeMeta runtimeMeta) { try { // 赋值 this.startTime = System.currentTimeMillis(); + this.runtimeMeta = runtimeMeta; this.instanceInfo = request.getInstanceInfo(); this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); - this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); + this.taskTrackerActorRef = runtimeMeta.getActorSystem().actorSelection(akkaRemotePath); - this.omsLogger = new OmsServerLogger(instanceId); + this.omsLogger = new OmsServerLogger(instanceId, runtimeMeta.getOmsLogHandler()); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; this.lastCompletedTaskCount = 0L; @@ -162,7 +164,7 @@ public class ProcessorTracker { newTask.setAddress(taskTrackerAddress); ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader(); - ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue); + ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue, runtimeMeta); try { threadPool.submit(processorRunnable); success = true; @@ -284,7 +286,9 @@ public class ProcessorTracker { log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime); // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受 - taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null); + ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId); + statusReportReq.setAddress(runtimeMeta.getWorkerAddress()); + taskTrackerActorRef.tell(statusReportReq, null); destroy(); return; } @@ -306,7 +310,9 @@ public class ProcessorTracker { // 上报当前 ProcessorTracker 负载 long waitingNum = threadPool.getQueue().size(); - taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum), null); + ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum); + statusReportReq.setAddress(runtimeMeta.getWorkerAddress()); + taskTrackerActorRef.tell(statusReportReq, null); log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum); } @@ -339,7 +345,9 @@ public class ProcessorTracker { String[] split = processorInfo.split("#"); log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]); - omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true); + String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress()); + ActorSelection actorSelection = runtimeMeta.getActorSystem().actorSelection(serverPath); + omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), actorSelection); if (omsContainer != null) { processor = omsContainer.getProcessor(split[1]); } else { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index b6f4777f..6ae435e4 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; @@ -49,8 +50,8 @@ public class CommonTaskTracker extends TaskTracker { private static final int MAX_REPORT_FAILED_THRESHOLD = 5; - protected CommonTaskTracker(ServerScheduleJobReq req) { - super(req); + protected CommonTaskTracker(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) { + super(req, runtimeMeta); } @Override @@ -85,7 +86,7 @@ public class CommonTaskTracker extends TaskTracker { // 填充基础信息 detail.setActualTriggerTime(createTime); detail.setStatus(InstanceStatus.RUNNING.getV()); - detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); + detail.setTaskTrackerAddress(runtimeMeta.getWorkerAddress()); // 填充详细信息 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); @@ -116,7 +117,7 @@ public class CommonTaskTracker extends TaskTracker { rootTask.setInstanceId(instanceInfo.getInstanceId()); rootTask.setTaskId(ROOT_TASK_ID); rootTask.setFailedCnt(0); - rootTask.setAddress(OhMyWorker.getWorkerAddress()); + rootTask.setAddress(runtimeMeta.getWorkerAddress()); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setLastModifiedTime(System.currentTimeMillis()); @@ -158,7 +159,7 @@ public class CommonTaskTracker extends TaskTracker { req.setFailedTaskNum(holder.failedNum); req.setReportTime(System.currentTimeMillis()); req.setStartTime(createTime); - req.setSourceAddress(OhMyWorker.getWorkerAddress()); + req.setSourceAddress(runtimeMeta.getWorkerAddress()); boolean success = false; String result = null; @@ -217,7 +218,7 @@ public class CommonTaskTracker extends TaskTracker { newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskId(LAST_TASK_ID); newLastTask.setSubInstanceId(instanceId); - newLastTask.setAddress(OhMyWorker.getWorkerAddress()); + newLastTask.setAddress(runtimeMeta.getWorkerAddress()); submitTask(Lists.newArrayList(newLastTask)); } } @@ -231,8 +232,8 @@ public class CommonTaskTracker extends TaskTracker { result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT; } - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress()); + ActorSelection serverActor = runtimeMeta.getActorSystem().actorSelection(serverPath); // 4. 执行完毕,报告服务器 if (finished.get()) { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index bfaad5c3..6c1ced06 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; @@ -62,8 +63,8 @@ public class FrequentTaskTracker extends TaskTracker { private static final String LAST_TASK_ID_PREFIX = "L"; private static final int MIN_INTERVAL = 50; - protected FrequentTaskTracker(ServerScheduleJobReq req) { - super(req); + protected FrequentTaskTracker(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) { + super(req, runtimeMeta); } @Override @@ -112,7 +113,7 @@ public class FrequentTaskTracker extends TaskTracker { // 填充基础信息 detail.setActualTriggerTime(createTime); detail.setStatus(InstanceStatus.RUNNING.getV()); - detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); + detail.setTaskTrackerAddress(runtimeMeta.getWorkerAddress()); List history = Lists.newLinkedList(); recentSubInstanceInfo.forEach((subId, subInstanceInfo) -> { @@ -152,7 +153,7 @@ public class FrequentTaskTracker extends TaskTracker { subInstanceInfo.startTime = System.currentTimeMillis(); recentSubInstanceInfo.put(subInstanceId, subInstanceInfo); - String myAddress = OhMyWorker.getWorkerAddress(); + String myAddress = runtimeMeta.getWorkerAddress(); String taskId = String.valueOf(subInstanceId); TaskDO newRootTask = new TaskDO(); @@ -301,7 +302,7 @@ public class FrequentTaskTracker extends TaskTracker { newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskId(LAST_TASK_ID_PREFIX + subInstanceId); newLastTask.setSubInstanceId(subInstanceId); - newLastTask.setAddress(OhMyWorker.getWorkerAddress()); + newLastTask.setAddress(runtimeMeta.getWorkerAddress()); submitTask(Lists.newArrayList(newLastTask)); } } @@ -313,7 +314,8 @@ public class FrequentTaskTracker extends TaskTracker { private void reportStatus() { - if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) { + String currentServerAddress = runtimeMeta.getServerDiscoveryService().getCurrentServerAddress(); + if (StringUtils.isEmpty(currentServerAddress)) { return; } @@ -327,14 +329,14 @@ public class FrequentTaskTracker extends TaskTracker { req.setTotalTaskNum(triggerTimes.get()); req.setSucceedTaskNum(succeedTimes.get()); req.setFailedTaskNum(failedTimes.get()); - req.setSourceAddress(OhMyWorker.getWorkerAddress()); + req.setSourceAddress(runtimeMeta.getWorkerAddress()); - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getServerActorPath(currentServerAddress); if (StringUtils.isEmpty(serverPath)) { return; } // 非可靠通知,Server挂掉后任务的kill工作交由其他线程去做 - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + ActorSelection serverActor = runtimeMeta.getActorSystem().actorSelection(serverPath); serverActor.tell(req, null); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 677e7587..eb42147c 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -15,6 +15,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; @@ -59,6 +60,10 @@ public abstract class TaskTracker { * TaskTracker创建时间 */ protected final long createTime; + /** + * worker 运行时元数据 + */ + protected final RuntimeMeta runtimeMeta; /** * 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份 */ @@ -100,10 +105,11 @@ public abstract class TaskTracker { private final SegmentLock segmentLock; private static final int UPDATE_CONCURRENCY = 4; - protected TaskTracker(ServerScheduleJobReq req) { + protected TaskTracker(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) { // 初始化成员变量 this.createTime = System.currentTimeMillis(); + this.runtimeMeta = runtimeMeta; this.instanceId = req.getInstanceId(); this.instanceInfo = new InstanceInfo(); BeanUtils.copyProperties(req, instanceInfo); @@ -118,7 +124,7 @@ public abstract class TaskTracker { instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); - this.taskPersistenceService = TaskPersistenceService.INSTANCE; + this.taskPersistenceService = runtimeMeta.getTaskPersistenceService(); this.finished = new AtomicBoolean(false); // 只有工作流中的任务允许向工作流中追加上下文数据 this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap(); @@ -140,15 +146,15 @@ public abstract class TaskTracker { * @param req 服务端调度任务请求 * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker */ - public static TaskTracker create(ServerScheduleJobReq req) { + public static TaskTracker create(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) { try { TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); switch (timeExpressionType) { case FIXED_RATE: case FIXED_DELAY: - return new FrequentTaskTracker(req); + return new FrequentTaskTracker(req, runtimeMeta); default: - return new CommonTaskTracker(req); + return new CommonTaskTracker(req, runtimeMeta); } } catch (Exception e) { log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e); @@ -160,10 +166,10 @@ public abstract class TaskTracker { response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString())); response.setReportTime(System.currentTimeMillis()); response.setStartTime(System.currentTimeMillis()); - response.setSourceAddress(OhMyWorker.getWorkerAddress()); + response.setSourceAddress(runtimeMeta.getWorkerAddress()); - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress()); + ActorSelection serverActor = runtimeMeta.getActorSystem().actorSelection(serverPath); serverActor.tell(response, null); } return null; @@ -185,8 +191,8 @@ public abstract class TaskTracker { return; } // 检查追加的上下文大小是否超出限制 - if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) { - log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength()); + if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength())) { + log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength()); // ignore appended workflow context data return; } @@ -345,7 +351,7 @@ public abstract class TaskTracker { String idlePtAddress = heartbeatReq.getAddress(); // 该 ProcessorTracker 已销毁,重置为初始状态 ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false); - List unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); + List unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); if (!CollectionUtils.isEmpty(unfinishedTask)) { log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask); unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); @@ -404,7 +410,7 @@ public abstract class TaskTracker { stopRequest.setInstanceId(instanceId); ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); + ActorSelection ptActor = runtimeMeta.getActorSystem().actorSelection(ptPath); // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 ptActor.tell(stopRequest, null); }); @@ -456,9 +462,9 @@ public abstract class TaskTracker { taskId2LastReportTime.put(task.getTaskId(), -1L); // 4. 任务派发 - TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task); + TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, runtimeMeta.getWorkerAddress()); String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); + ActorSelection ptActor = runtimeMeta.getActorSystem().actorSelection(ptActorPath); ptActor.tell(startTaskReq, null); log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName()); @@ -549,13 +555,13 @@ public abstract class TaskTracker { protected class WorkerDetector implements Runnable { @Override public void run() { - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress()); if (StringUtils.isEmpty(serverPath)) { log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); return; } - WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(OhMyWorker.getAppId(), instanceInfo.getJobId()); - AskResponse response = AkkaUtils.easyAsk(OhMyWorker.actorSystem.actorSelection(serverPath), req); + WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(runtimeMeta.getAppId(), instanceInfo.getJobId()); + AskResponse response = AkkaUtils.easyAsk(runtimeMeta.getActorSystem().actorSelection(serverPath), req); if (!response.isSuccess()) { log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); return; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java index 38d7c1b7..73b7b232 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java @@ -19,6 +19,7 @@ import org.slf4j.helpers.MessageFormatter; public class OmsServerLogger implements OmsLogger { private final long instanceId; + private final OmsLogHandler omsLogHandler; @Override public void debug(String messagePattern, Object... args) { @@ -59,7 +60,7 @@ public class OmsServerLogger implements OmsLogger { private void process(LogLevel level, String messagePattern, Object... args) { String logContent = genLogContent(messagePattern, args); - OmsLogHandler.INSTANCE.submitLog(instanceId, level, logContent); + omsLogHandler.submitLog(instanceId, level, logContent); } } \ No newline at end of file diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java index 0603576a..7bd23325 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java @@ -23,46 +23,37 @@ import java.sql.SQLException; @Slf4j public class ConnectionFactory { - private static volatile DataSource dataSource; + private volatile DataSource dataSource; - private static final String H2_PATH = OmsWorkerFileUtils.getH2WorkDir(); - private static final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); - private static final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); + private final String H2_PATH = OmsWorkerFileUtils.getH2WorkDir(); + private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); + private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); - public static Connection getConnection() throws SQLException { - return getDataSource().getConnection(); + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); } - private static DataSource getDataSource() { - if (dataSource != null) { - return dataSource; + public synchronized void initDatasource(StoreStrategy strategy) { + // 兼容单元测试,否则没办法单独测试 DAO 层了 + strategy = strategy == null ? StoreStrategy.DISK : strategy; + + HikariConfig config = new HikariConfig(); + config.setDriverClassName(Driver.class.getName()); + config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL); + config.setAutoCommit(true); + // 池中最小空闲连接数量 + config.setMinimumIdle(2); + // 池中最大连接数量 + config.setMaximumPoolSize(32); + dataSource = new HikariDataSource(config); + + log.info("[PowerDatasource] init h2 datasource successfully, use url: {}", config.getJdbcUrl()); + + // JVM 关闭时删除数据库文件 + try { + FileUtils.forceDeleteOnExit(new File(H2_PATH)); + }catch (Exception ignore) { } - synchronized (ConnectionFactory.class) { - if (dataSource == null) { - - // 兼容单元测试,否则没办法单独测试 DAO 层了 - StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy(); - - HikariConfig config = new HikariConfig(); - config.setDriverClassName(Driver.class.getName()); - config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL); - config.setAutoCommit(true); - // 池中最小空闲连接数量 - config.setMinimumIdle(2); - // 池中最大连接数量 - config.setMaximumPoolSize(32); - dataSource = new HikariDataSource(config); - - log.info("[OmsDatasource] init h2 datasource successfully, use url: {}", config.getJdbcUrl()); - - // JVM 关闭时删除数据库文件 - try { - FileUtils.forceDeleteOnExit(new File(H2_PATH)); - }catch (Exception ignore) { - } - } - } - return dataSource; } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDAOImpl.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDAOImpl.java index a6fecde0..0c826296 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDAOImpl.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDAOImpl.java @@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; import java.sql.*; import java.util.Collection; @@ -16,7 +17,10 @@ import java.util.Map; * @author tjq * @since 2020/3/17 */ +@AllArgsConstructor public class TaskDAOImpl implements TaskDAO { + + private final ConnectionFactory connectionFactory; @Override public void initTable() throws Exception { @@ -26,7 +30,7 @@ public class TaskDAOImpl implements TaskDAO { // bigint(20) 与 Java Long 取值范围完全一致 String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint(20), sub_instance_id bigint(20), task_name varchar(255), task_content blob, address varchar(255), status int(5), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), last_report_time bigint(20), unique KEY pkey (instance_id, task_id))"; - try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { + try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.execute(delTableSQL); stat.execute(createTableSQL); } @@ -35,7 +39,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean save(TaskDO task) throws SQLException { String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)"; - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { fillInsertPreparedStatement(task, ps); return ps.executeUpdate() == 1; } @@ -44,7 +48,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean batchSave(Collection tasks) throws SQLException { String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)"; - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { for (TaskDO task : tasks) { @@ -63,7 +67,7 @@ public class TaskDAOImpl implements TaskDAO { public boolean simpleDelete(SimpleTaskQuery condition) throws SQLException { String deleteSQL = "delete from task_info where %s"; String sql = String.format(deleteSQL, condition.getQueryCondition()); - try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { + try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.executeUpdate(sql); return true; } @@ -74,7 +78,7 @@ public class TaskDAOImpl implements TaskDAO { ResultSet rs = null; String sql = "select * from task_info where " + query.getQueryCondition(); List result = Lists.newLinkedList(); - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { rs = ps.executeQuery(); while (rs.next()) { result.add(convert(rs)); @@ -96,7 +100,7 @@ public class TaskDAOImpl implements TaskDAO { String sqlFormat = "select %s from task_info where %s"; String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition()); List> result = Lists.newLinkedList(); - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { rs = ps.executeQuery(); // 原数据,包含了列名 ResultSetMetaData metaData = rs.getMetaData(); @@ -125,7 +129,7 @@ public class TaskDAOImpl implements TaskDAO { public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException { String sqlFormat = "update task_info set %s where %s"; String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { stat.executeUpdate(); return true; } @@ -136,7 +140,7 @@ public class TaskDAOImpl implements TaskDAO { ResultSet rs = null; List taskResults = Lists.newLinkedList(); String sql = "select task_id, status, result from task_info where instance_id = ? and sub_instance_id = ?"; - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, instanceId); ps.setLong(2, subInstanceId); rs = ps.executeQuery(); @@ -168,7 +172,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException { String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?"; - try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setInt(1, status); ps.setLong(2, lastReportTime); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java index 5f5de67a..e363cb26 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java @@ -4,11 +4,14 @@ package com.github.kfcfans.powerjob.worker.persistence; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.SupplierPlus; +import com.github.kfcfans.powerjob.worker.common.OhMyConfig; +import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; @@ -26,24 +29,25 @@ import java.util.Optional; @Slf4j public class TaskPersistenceService { + private final StoreStrategy strategy; + // 默认重试参数 private static final int RETRY_TIMES = 3; private static final long RETRY_INTERVAL_MS = 100; - private static volatile boolean initialized = false; - public static TaskPersistenceService INSTANCE = new TaskPersistenceService(); + private TaskDAO taskDAO; - private TaskPersistenceService() { + public TaskPersistenceService(StoreStrategy strategy) { + this.strategy = strategy; } - private final TaskDAO taskDAO = new TaskDAOImpl(); - public void init() throws Exception { - if (initialized) { - return; - } + + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.initDatasource(strategy); + + taskDAO = new TaskDAOImpl(connectionFactory); taskDAO.initTable(); - initialized = true; } public boolean save(TaskDO task) { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java index 37941078..e7ba8549 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java @@ -45,7 +45,6 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable { req.type = IDLE; req.instanceId = instanceId; req.time = System.currentTimeMillis(); - req.address = OhMyWorker.getWorkerAddress(); req.setRemainTaskNum(0); return req; } @@ -55,7 +54,6 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable { req.type = LOAD; req.instanceId = instanceId; req.time = System.currentTimeMillis(); - req.address = OhMyWorker.getWorkerAddress(); req.setRemainTaskNum(remainTaskNum); return req; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java index 6663d363..99137a1a 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -37,9 +37,9 @@ public class TaskTrackerStartTaskReq implements OmsSerializable { /** * 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用 */ - public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) { + public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task, String taskTrackerAddress) { - this.taskTrackerAddress = OhMyWorker.getWorkerAddress(); + this.taskTrackerAddress = taskTrackerAddress; this.instanceInfo = instanceInfo; this.taskId = task.getTaskId(); diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java index 368947c1..e43c842c 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob; +import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; @@ -18,7 +19,7 @@ import java.util.concurrent.ThreadLocalRandom; */ public class PersistenceServiceTest { - private static TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE; + private static TaskPersistenceService taskPersistenceService = new TaskPersistenceService(StoreStrategy.DISK); @BeforeAll public static void initTable() throws Exception { diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java index 0a09a4df..f689bfe2 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java @@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.TestUtils; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; +import com.github.kfcfans.powerjob.worker.common.RuntimeMeta; import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; @@ -22,7 +23,7 @@ public class IdleTest extends CommonTest { @Test public void testProcessorTrackerSendIdleReport() throws Exception { TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor"); - ProcessorTracker pt = new ProcessorTracker(req); + ProcessorTracker pt = new ProcessorTracker(req, new RuntimeMeta()); Thread.sleep(300000); } @@ -32,7 +33,7 @@ public class IdleTest extends CommonTest { ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L); ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API); - TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq); + TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq, new RuntimeMeta()); if (taskTracker != null) { taskTracker.receiveProcessorTrackerHeartbeat(req); }