From d46a6de26ec455ef6c03e9054383ae01e04419a7 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 20 Jan 2023 13:18:58 +0800 Subject: [PATCH] feat: chang to use PowerJobRemoteEngine to replace akka --- .../tech/powerjob/common/RemoteConstant.java | 22 ++++++ .../tech/powerjob/worker/PowerJobWorker.java | 9 --- .../worker/actors/ProcessorTrackerActor.java | 9 +-- .../worker/actors/TaskTrackerActor.java | 4 -- .../background/WorkerHealthReporter.java | 12 ++-- .../worker/common/utils/AkkaUtils.java | 64 ----------------- .../worker/common/utils/TransportUtils.java | 67 +++++++++++++----- .../worker/container/OmsContainerFactory.java | 17 ++--- .../core/processor/ProcessorLoader.java | 6 +- .../runnable/HeavyProcessorRunnable.java | 1 - .../core/processor/sdk/MapProcessor.java | 2 - .../tracker/processor/ProcessorTracker.java | 4 -- .../worker/core/tracker/task/TaskTracker.java | 19 ++--- .../tracker/task/heavy/CommonTaskTracker.java | 11 +-- .../task/heavy/FrequentTaskTracker.java | 10 +-- .../tracker/task/heavy/HeavyTaskTracker.java | 41 +++++------ .../tracker/task/light/LightTaskTracker.java | 9 +-- .../impl/JarContainerProcessorFactory.java | 6 +- .../src/main/resources/oms-worker.akka.conf | 70 ------------------- .../worker/test/CommonTaskTrackerTest.java | 68 ------------------ .../tech/powerjob/worker/test/CommonTest.java | 33 --------- .../worker/test/FrequentTaskTrackerTest.java | 53 -------------- .../src/test/resources/oms-akka-test.conf | 18 ----- 23 files changed, 115 insertions(+), 440 deletions(-) delete mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/AkkaUtils.java delete mode 100644 powerjob-worker/src/main/resources/oms-worker.akka.conf delete mode 100644 powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTaskTrackerTest.java delete mode 100644 powerjob-worker/src/test/java/tech/powerjob/worker/test/FrequentTaskTrackerTest.java delete mode 100644 powerjob-worker/src/test/resources/oms-akka-test.conf diff --git a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java index bfd4246a..afcdde5a 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java @@ -36,7 +36,29 @@ public class RemoteConstant { /* ************************ SERVER ************************ */ public static final String SERVER_PATH = "server"; + /** + * server 处理在线日志 + */ public static final String SERVER_HANDLER_REPORT_LOG = "reportLog"; + /** + * server 处理 worker 心跳 + */ + public static final String SERVER_HANDLER_WORKER_HEARTBEAT = "workerHeartbeat"; + + /** + * server 处理 TaskTracker 上报的任务实例状态 + */ + public static final String SERVER_HANDLER_REPORT_INSTANCE_STATUS = "reportInstanceStatus"; + + /** + * server 查询任务的可执行集群 + */ + public static final String SERVER_HANDLER_QUERY_JOB_CLUSTER = "queryJobCluster"; + + /** + * server 处理 worker 请求部署容器命令 + */ + public static final String SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER = "container"; /* ************************ Worker-TaskTracker ************************ */ public static final String WTT_PATH = "taskTracker"; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 15130e53..e32ecec8 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -1,22 +1,13 @@ package tech.powerjob.worker; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.DeadLetter; -import akka.actor.Props; -import akka.routing.RoundRobinPool; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.serialize.JsonUtils; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java index bcc76dd4..3b933386 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java @@ -1,20 +1,17 @@ package tech.powerjob.worker.actors; -import akka.actor.AbstractActor; -import akka.actor.Props; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; import tech.powerjob.common.RemoteConstant; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.core.tracker.processor.ProcessorTracker; import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager; +import tech.powerjob.worker.core.tracker.processor.ProcessorTracker; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.util.CollectionUtils; import java.util.List; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java index 8b920b1c..0e3e0bf7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java @@ -1,11 +1,7 @@ package tech.powerjob.worker.actors; -import akka.actor.AbstractActor; -import akka.actor.Props; import com.google.common.collect.Lists; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.InstanceDetail; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java index 50d50f20..0e155ce1 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java @@ -1,17 +1,16 @@ package tech.powerjob.worker.background; -import akka.actor.ActorSelection; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.model.SystemMetrics; import tech.powerjob.common.request.WorkerHeartbeat; import tech.powerjob.worker.common.PowerJobWorkerVersion; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.SystemInfoUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.container.OmsContainerFactory; import lombok.extern.slf4j.Slf4j; -import org.springframework.util.StringUtils; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; @@ -68,8 +67,7 @@ public class WorkerHealthReporter implements Runnable { // 获取当前加载的容器列表 heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos()); // 发送请求 - String serverPath = AkkaUtils.getServerActorPath(currentServer); - if (StringUtils.isEmpty(serverPath)) { + if (StringUtils.isEmpty(currentServer)) { return; } // log @@ -82,7 +80,7 @@ public class WorkerHealthReporter implements Runnable { workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(), heartbeat.getHeavyTaskTrackerNum() ); - ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath); - actorSelection.tell(heartbeat, null); + + TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter()); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/AkkaUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/AkkaUtils.java deleted file mode 100644 index c94d2f2f..00000000 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/AkkaUtils.java +++ /dev/null @@ -1,64 +0,0 @@ -package tech.powerjob.worker.common.utils; - -import akka.actor.ActorSelection; -import akka.pattern.Patterns; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.RemoteConstant; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; - -import java.time.Duration; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -/** - * AKKA 工具类 - * - * @author tjq - * @since 2020/3/17 - */ -@Slf4j -public class AkkaUtils { - - /** - * akka://@:/ - */ - private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s"; - - public static String getAkkaWorkerPath(String address, String actorName) { - return String.format(AKKA_NODE_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, actorName); - } - - public static String getServerActorPath(String serverAddress) { - if (StringUtils.isEmpty(serverAddress)) { - return null; - } - return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, serverAddress, RemoteConstant.SERVER_ACTOR_NAME); - } - - /** - * 可靠传输 - * @param remote 远程 AKKA 节点 - * @param msg 需要传输的对象 - * @return true: 对方接收成功 / false: 对方接收失败(可能传输成功但对方处理失败,需要协同处理 AskResponse 返回值) - */ - public static boolean reliableTransmit(ActorSelection remote, Object msg) { - try { - return easyAsk(remote, msg).isSuccess(); - }catch (Exception e) { - log.warn("[PowerTransmitter] transmit {} failed", msg, e); - } - return false; - } - - public static AskResponse easyAsk(ActorSelection remote, Object msg) { - try { - CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - return (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - }catch (Exception e) { - throw new PowerJobException(e); - } - } - -} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java index 457d31e9..0bc7ea08 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java @@ -1,9 +1,11 @@ package tech.powerjob.worker.common.utils; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.exception.PowerJobCheckedException; -import tech.powerjob.common.request.WorkerLogReportReq; +import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.HandlerLocation; @@ -11,9 +13,7 @@ import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; -import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; -import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; +import tech.powerjob.worker.pojo.request.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -29,6 +29,21 @@ import static tech.powerjob.common.RemoteConstant.*; @Slf4j public class TransportUtils { + public static void ttReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) { + final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address); + transporter.tell(url, req); + } + + public static void ttStartPtTask(TaskTrackerStartTaskReq req, String address, Transporter transporter) { + final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_START_TASK, address); + transporter.tell(url, req); + } + + public static void ttStopPtInstance(TaskTrackerStopInstanceReq req, String address, Transporter transporter) { + final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_STOP_INSTANCE, address); + transporter.tell(url, req); + } + public static void ptReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) { final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address); workerRuntime.getTransporter().tell(url, req); @@ -44,15 +59,14 @@ public class TransportUtils { transporter.tell(url, req); } + public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) { + final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_HEARTBEAT, address); + transporter.tell(url, req); + } + public static boolean reliablePtReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) { try { - final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address); - final CompletionStage completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class); - return completionStage - .toCompletableFuture() - .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .isSuccess(); - + return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address, req, workerRuntime.getTransporter()).isSuccess(); } catch (Exception e) { log.warn("[PowerJobTransport] reliablePtReportTask failed: {}", req, e); return false; @@ -61,18 +75,35 @@ public class TransportUtils { public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException { try { - final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address); - final CompletionStage completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class); - return completionStage - .toCompletableFuture() - .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .isSuccess(); - + return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess(); } catch (Throwable throwable) { throw new PowerJobCheckedException(throwable); } } + @SneakyThrows + public static boolean reliableTtReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) { + return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address, req, transporter).isSuccess(); + } + + @SneakyThrows + public static AskResponse reliableQueryJobCluster(WorkerQueryExecutorClusterReq req, String address, Transporter transporter) { + return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter); + } + + @SneakyThrows + public static AskResponse reliableQueryContainerInfo(WorkerNeedDeployContainerRequest req, String address, Transporter transporter) { + return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, address, req, transporter); + } + + private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception { + final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address); + final CompletionStage completionStage = transporter.ask(url, req, AskResponse.class); + return completionStage + .toCompletableFuture() + .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) { HandlerLocation handlerLocation = new HandlerLocation() .setServerType(serverType) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java index 6993af34..2834b13b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java @@ -1,8 +1,5 @@ package tech.powerjob.worker.container; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; -import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.common.request.ServerDeployContainerRequest; import tech.powerjob.common.request.WorkerNeedDeployContainerRequest; @@ -12,14 +9,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; +import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.common.utils.TransportUtils; import java.io.File; import java.net.URL; -import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; /** * 容器工厂 @@ -39,16 +35,14 @@ public class OmsContainerFactory { * @param serverActor 当容器不存在且 serverActor 非空时,尝试从服务端重新拉取容器 * @return 容器示例,可能为 null */ - public static OmsContainer fetchContainer(Long containerId, ActorSelection serverActor) { + public static OmsContainer fetchContainer(Long containerId, WorkerRuntime workerRuntime) { OmsContainer omsContainer = CARGO.get(containerId); if (omsContainer != null) { return omsContainer; } - if (serverActor == null) { - return null; - } + final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); // 尝试从 server 加载 log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId); @@ -56,8 +50,7 @@ public class OmsContainerFactory { try { - CompletionStage askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + AskResponse askResponse = TransportUtils.reliableQueryContainerInfo(request, currentServerAddress, workerRuntime.getTransporter()); if (askResponse.isSuccess()) { ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/ProcessorLoader.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/ProcessorLoader.java index 9256f337..0b6569e7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/ProcessorLoader.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/ProcessorLoader.java @@ -1,12 +1,10 @@ package tech.powerjob.worker.core.processor; -import akka.actor.ActorSelection; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.SpringUtils; import tech.powerjob.worker.container.OmsContainer; import tech.powerjob.worker.container.OmsContainerFactory; @@ -62,9 +60,7 @@ public class ProcessorLoader { String[] split = processorInfo.split("#"); log.info("[ProcessorLoader] try to load processor({}) in container({})", split[1], split[0]); - String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); - ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath); - OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), actorSelection); + OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), workerRuntime); if (omsContainer != null) { processorInfoHolder = ProcessorInfo.of(omsContainer.getProcessor(split[1]), omsContainer.getContainerClassLoader()); } else { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java index 454895f3..b737e6a6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java @@ -5,7 +5,6 @@ import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.common.serialize.SerializerUtils; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.common.utils.WorkflowContextUtils; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java index 3cea7976..db2dc6ab 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -3,12 +3,10 @@ package tech.powerjob.worker.core.processor.sdk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; -import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.exception.PowerJobCheckedException; import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 05ebe598..a926ec77 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -9,12 +9,9 @@ import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.model.LogConfig; -import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.ProcessorInfo; import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable; @@ -22,7 +19,6 @@ import tech.powerjob.worker.core.processor.ProcessorLoader; import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager; import tech.powerjob.worker.log.OmsLogger; import tech.powerjob.worker.log.OmsLoggerFactory; -import tech.powerjob.worker.log.impl.OmsServerLogger; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index f3546db1..bbbb45df 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -1,7 +1,5 @@ package tech.powerjob.worker.core.tracker.task; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -9,16 +7,12 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; -import tech.powerjob.common.response.AskResponse; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.pojo.model.InstanceInfo; -import java.time.Duration; import java.util.Collections; import java.util.Map; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -107,18 +101,15 @@ public abstract class TaskTracker { response.setStartTime(System.currentTimeMillis()); response.setSourceAddress(workerRuntime.getWorkerAddress()); - String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); - ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath); - serverActor.tell(response, null); + TransportUtils.ttReportInstanceStatus(response, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter()); } - protected void reportFinalStatusThenDestroy(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { + protected void reportFinalStatusThenDestroy(WorkerRuntime workerRuntime, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { + String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); // 最终状态需要可靠上报 - CompletionStage ask = Patterns.ask(serverActor, reportInstanceStatusReq, Duration.ofSeconds(15)); boolean serverAccepted = false; try { - AskResponse askResponse = (AskResponse) ask.toCompletableFuture().get(15, TimeUnit.SECONDS); - serverAccepted = askResponse.isSuccess(); + serverAccepted = TransportUtils.reliableTtReportInstanceStatus(reportInstanceStatusReq, currentServerAddress, workerRuntime.getTransporter()); } catch (Exception e) { log.warn("[TaskTracker-{}] report finished status failed, req={}.", instanceId, reportInstanceStatusReq, e); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index ef9c2e0a..126d8204 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -1,7 +1,5 @@ package tech.powerjob.worker.core.tracker.task.heavy; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.ToString; @@ -20,7 +18,7 @@ import tech.powerjob.common.response.AskResponse; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; -import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.persistence.TaskDO; import java.time.Duration; @@ -229,22 +227,19 @@ public class CommonTaskTracker extends HeavyTaskTracker { result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT; } - String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); - ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath); - // 4. 执行完毕,报告服务器 if (finished.get()) { req.setResult(result); // 上报追加的工作流上下文信息 req.setAppendedWfContext(appendedWfContext); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); - reportFinalStatusThenDestroy(serverActor,req); + reportFinalStatusThenDestroy(workerRuntime, req); return; } // 5. 未完成,上报状态 req.setInstanceStatus(InstanceStatus.RUNNING.getV()); - serverActor.tell(req, null); + TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter()); // 6.1 定期检查 -> 重试派发后未确认的任务 long currentMS = System.currentTimeMillis(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java index f46ea3de..fd1a0c97 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java @@ -1,6 +1,5 @@ package tech.powerjob.worker.core.tracker.task.heavy; -import akka.actor.ActorSelection; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; @@ -22,8 +21,8 @@ import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.LRUCache; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.persistence.TaskDO; import java.util.*; @@ -358,13 +357,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker { log.warn("[FQTaskTracker-{}] report alert req,time:{}", instanceId, req.getReportTime()); } - String serverPath = AkkaUtils.getServerActorPath(currentServerAddress); - if (StringUtils.isEmpty(serverPath)) { - return; - } // 非可靠通知,Server挂掉后任务的kill工作交由其他线程去做 - ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath); - serverActor.tell(req, null); + TransportUtils.ttReportInstanceStatus(req, currentServerAddress, workerRuntime.getTransporter()); } /** diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index ab459392..398157db 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -1,15 +1,13 @@ package tech.powerjob.worker.core.tracker.task.heavy; -import akka.actor.ActorSelection; import com.fasterxml.jackson.core.type.TypeReference; import lombok.AllArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.util.CollectionUtils; import tech.powerjob.common.enums.ExecuteType; -import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; -import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.request.WorkerQueryExecutorClusterReq; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.utils.CommonUtils; @@ -18,7 +16,7 @@ import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; -import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.common.utils.WorkflowContextUtils; import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; @@ -32,15 +30,10 @@ import com.google.common.base.Stopwatch; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -365,11 +358,9 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 1. 通知 ProcessorTracker 释放资源 TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); - ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { - String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection ptActor = workerRuntime.getActorSystem().actorSelection(ptPath); + ptStatusHolder.getAllProcessorTrackers().forEach(ptAddress -> { // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 - ptActor.tell(stopRequest, null); + TransportUtils.ttStopPtInstance(stopRequest, ptAddress, workerRuntime.getTransporter()); }); // 2. 删除所有数据库数据 @@ -425,9 +416,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 4. 任务派发 TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress()); - String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection ptActor = workerRuntime.getActorSystem().actorSelection(ptActorPath); - ptActor.tell(startTaskReq, null); + TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter()); log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName()); } @@ -524,18 +513,20 @@ public abstract class HeavyTaskTracker extends TaskTracker { return; } - String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); - if (StringUtils.isEmpty(serverPath)) { + final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); + if (StringUtils.isEmpty(currentServerAddress)) { log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); return; } - WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId()); - AskResponse response = AkkaUtils.easyAsk(workerRuntime.getActorSystem().actorSelection(serverPath), req); - if (!response.isSuccess()) { - log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); - return; - } + try { + WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId()); + AskResponse response = TransportUtils.reliableQueryJobCluster(req, currentServerAddress, workerRuntime.getTransporter()); + if (!response.isSuccess()) { + log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); + return; + } + List workerList = JsonUtils.parseObject(response.getData(), new TypeReference>() {}); ptStatusHolder.register(workerList); } catch (Exception e) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index d46fc5dd..577a285e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -1,6 +1,5 @@ package tech.powerjob.worker.core.tracker.task.light; -import akka.actor.ActorSelection; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; @@ -13,7 +12,7 @@ import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; -import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.*; import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; @@ -249,8 +248,6 @@ public class LightTaskTracker extends TaskTracker { log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status); return; } - String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); - ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath); TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq(); reportInstanceStatusReq.setAppId(workerRuntime.getAppId()); reportInstanceStatusReq.setJobId(instanceInfo.getJobId()); @@ -303,13 +300,13 @@ public class LightTaskTracker extends TaskTracker { reportInstanceStatusReq.setEndTime(taskEndTime); // 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态 reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1); - reportFinalStatusThenDestroy(serverActor, reportInstanceStatusReq); + reportFinalStatusThenDestroy(workerRuntime, reportInstanceStatusReq); return; } // 未完成的任务,只需要上报状态 reportInstanceStatusReq.setInstanceStatus(InstanceStatus.RUNNING.getV()); log.info("[TaskTracker-{}] report status({}) success,real status is {}", instanceId, reportInstanceStatusReq, status); - serverActor.tell(reportInstanceStatusReq, null); + TransportUtils.ttReportInstanceStatus(reportInstanceStatusReq, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter()); } private void timeoutCheck() { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java index 9d815dbc..bf4bb5c6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java @@ -1,11 +1,9 @@ package tech.powerjob.worker.processor.impl; -import akka.actor.ActorSelection; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.container.OmsContainer; import tech.powerjob.worker.container.OmsContainerFactory; import tech.powerjob.worker.extension.processor.ProcessorBean; @@ -45,9 +43,7 @@ public class JarContainerProcessorFactory implements ProcessorFactory { log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName); - String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); - ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath); - OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), actorSelection); + OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime); if (omsContainer != null) { return new ProcessorBean() .setProcessor(omsContainer.getProcessor(className)) diff --git a/powerjob-worker/src/main/resources/oms-worker.akka.conf b/powerjob-worker/src/main/resources/oms-worker.akka.conf deleted file mode 100644 index 16f7b911..00000000 --- a/powerjob-worker/src/main/resources/oms-worker.akka.conf +++ /dev/null @@ -1,70 +0,0 @@ -akka { - - loggers = ["akka.event.slf4j.Slf4jLogger"] - loglevel = "WARNING" - - actor { - # cluster is better(recommend by official document), but I prefer remote - provider = remote - allow-java-serialization = off - - serializers { - power-serializer = "tech.powerjob.common.serialize.PowerAkkaSerializer" - } - - serialization-bindings { - "tech.powerjob.common.PowerSerializable" = power-serializer - } - } - remote { - artery { - transport = tcp # See Selecting a transport below - # over write by code - canonical.hostname = "127.0.0.1" - canonical.port = 25520 - } - } - - # dispatcher - task-tracker-dispatcher { - # Dispatcher is the name of the event-based dispatcher - type = Dispatcher - # What kind of ExecutionService to use - executor = "fork-join-executor" - # Configuration for the fork join pool - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 4.0 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 64 - } - # Throughput defines the maximum number of messages to be - # processed per actor before the thread jumps to the next actor. - # Set to 1 for as fair as possible. - throughput = 10 - } - - processor-tracker-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - parallelism-min = 2 - parallelism-factor = 2.0 - parallelism-max = 64 - } - throughput = 10 - } - - worker-common-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - parallelism-min = 2 - parallelism-factor = 2.0 - parallelism-max = 8 - } - throughput = 10 - } -} \ No newline at end of file diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTaskTrackerTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTaskTrackerTest.java deleted file mode 100644 index d3becf1f..00000000 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTaskTrackerTest.java +++ /dev/null @@ -1,68 +0,0 @@ -package tech.powerjob.worker.test; - -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.enums.ExecuteType; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.worker.PowerJobWorker; -import tech.powerjob.worker.common.PowerJobWorkerConfig; -import tech.powerjob.worker.common.utils.AkkaUtils; -import tech.powerjob.common.utils.NetUtils; -import com.google.common.collect.Lists; -import com.typesafe.config.ConfigFactory; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -/** - * 测试完整的 JobInstance 执行流程 - * - * @author tjq - * @since 2020/3/25 - */ -public class CommonTaskTrackerTest { - - private static ActorSelection remoteTaskTracker; - - @BeforeAll - public static void init() throws Exception { - - PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig(); - workerConfig.setAppName("oms-test"); - workerConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700")); - workerConfig.setEnableTestMode(true); - - PowerJobWorker worker = new PowerJobWorker(); - worker.setConfig(workerConfig); - worker.init(); - - ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME); - remoteTaskTracker = testAS.actorSelection(akkaRemotePath); - } - - @Test - public void justStartWorkerToTestServer() throws Exception { - Thread.sleep(277277277); - } - - @Test - public void testStandaloneJob() throws Exception { - - remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.CRON), null); - Thread.sleep(5000000); - } - - @Test - public void testMapReduceJob() throws Exception { - remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.CRON), null); - Thread.sleep(5000000); - } - - @Test - public void testBroadcast() throws Exception { - remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.BROADCAST, TimeExpressionType.CRON), null); - Thread.sleep(5000000); - } - -} diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java index 093ff9df..080bdf74 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java @@ -1,19 +1,10 @@ package tech.powerjob.worker.test; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; -import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.worker.PowerJobWorker; -import tech.powerjob.worker.common.PowerJobWorkerConfig; -import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; -import com.typesafe.config.ConfigFactory; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; /** * 启动公共服务 @@ -23,30 +14,6 @@ import org.junit.jupiter.api.BeforeAll; */ public class CommonTest { - protected static ActorSelection remoteProcessorTracker; - protected static ActorSelection remoteTaskTracker; - - @BeforeAll - public static void startWorker() throws Exception { - PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig(); - workerConfig.setAppName("oms-test"); - workerConfig.setEnableTestMode(true); - - PowerJobWorker worker = new PowerJobWorker(); - worker.setConfig(workerConfig); - worker.init(); - - ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String address = NetUtils.getLocalHost() + ":27777"; - - remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME)); - remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.TASK_TRACKER_ACTOR_NAME)); - } - - @AfterAll - public static void stop() throws Exception { - Thread.sleep(120000); - } public static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) { diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/FrequentTaskTrackerTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/FrequentTaskTrackerTest.java deleted file mode 100644 index 0bf630c5..00000000 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/FrequentTaskTrackerTest.java +++ /dev/null @@ -1,53 +0,0 @@ -package tech.powerjob.worker.test; - -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import tech.powerjob.common.enums.ExecuteType; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.worker.PowerJobWorker; -import tech.powerjob.worker.common.PowerJobWorkerConfig; -import tech.powerjob.worker.common.utils.AkkaUtils; -import com.google.common.collect.Lists; -import com.typesafe.config.ConfigFactory; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -/** - * description - * - * @author tjq - * @since 2020/4/9 - */ -public class FrequentTaskTrackerTest { - - private static ActorSelection remoteTaskTracker; - - @BeforeAll - public static void init() throws Exception { - - PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig(); - workerConfig.setAppName("oms-test"); - workerConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700")); - PowerJobWorker worker = new PowerJobWorker(); - worker.setConfig(workerConfig); - worker.init(); - - ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME); - remoteTaskTracker = testAS.actorSelection(akkaRemotePath); - } - - @Test - public void testFixRateJob() throws Exception { - remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.FIXED_RATE), null); - Thread.sleep(5000000); - } - - @Test - public void testFixDelayJob() throws Exception { - remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.FIXED_DELAY), null); - Thread.sleep(5000000); - } -} diff --git a/powerjob-worker/src/test/resources/oms-akka-test.conf b/powerjob-worker/src/test/resources/oms-akka-test.conf deleted file mode 100644 index 7615d41a..00000000 --- a/powerjob-worker/src/test/resources/oms-akka-test.conf +++ /dev/null @@ -1,18 +0,0 @@ -akka { - actor { - # for test - provider = remote - allow-java-serialization = off - - serialization-bindings { - "OmsSerializable" = jackson-cbor - } - } - remote { - artery { - transport = tcp # See Selecting a transport below - canonical.hostname = "127.0.0.1" - canonical.port = 25521 - } - } -} \ No newline at end of file