From 2020f729055037605129b3635f6cb11733162013 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 20 Jan 2023 14:19:09 +0800 Subject: [PATCH] feat: use PowerJobRemoteEngine to replace akka --- .../tech/powerjob/common/RemoteConstant.java | 5 ----- .../framework/base/HandlerLocation.java | 4 ---- .../powerjob/remote/framework/base/URL.java | 6 ++++++ .../remote/akka/AkkaMappingService.java | 10 +++++++++- .../powerjob/remote/akka/AkkaTransporter.java | 7 +------ .../src/main/resources/logback-dev.xml | 2 ++ powerjob-worker/pom.xml | 19 ++++++++++++++++++- .../tech/powerjob/worker/PowerJobWorker.java | 15 +++++++++------ .../powerjob/worker/actors/WorkerActor.java | 6 +++++- .../worker/common/PowerJobWorkerConfig.java | 5 +++++ .../worker/common/utils/TransportUtils.java | 2 +- 11 files changed, 56 insertions(+), 25 deletions(-) diff --git a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java index fe1883a0..f2f2e5ff 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java @@ -14,12 +14,7 @@ public class RemoteConstant { public static final String WORKER_ACTOR_SYSTEM_NAME = "oms"; - public static final String TASK_TRACKER_ACTOR_NAME = "task_tracker"; - public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; - public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting"; - - public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf"; /* ************************ AKKA SERVER ************************ */ diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java index 31fc498a..6f9cc5ba 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/HandlerLocation.java @@ -26,10 +26,6 @@ public class HandlerLocation implements Serializable { * 方法路径 */ private String methodPath; - /** - * 调用的集群类型(用于兼容 AKKA 等除了IP还需要指定 system 访问的情况) - */ - private ServerType serverType; public String toPath() { return String.format("/%s/%s", rootPath, methodPath); diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java index 24e71b64..c1909b99 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/URL.java @@ -14,6 +14,12 @@ import java.io.Serializable; @Data @Accessors(chain = true) public class URL implements Serializable { + + /** + * 调用的集群类型(用于兼容 AKKA 等除了IP还需要指定 system 访问的情况) + */ + private ServerType serverType; + /** * remote address */ diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java index ff115eeb..904b47f2 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java @@ -4,6 +4,7 @@ import com.google.common.collect.Maps; import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; +import tech.powerjob.common.RemoteConstant; import java.util.Map; @@ -21,7 +22,7 @@ public class AkkaMappingService { private static final Map RP_2_ACTOR_CFG = Maps.newHashMap(); static { - // TODO: 迁移时写入规则 + addMappingRule(RemoteConstant.SERVER_PATH, "server_actor", null); } private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher"; @@ -46,4 +47,11 @@ public class AkkaMappingService { private String actorName; private String dispatcherName; } + + private static void addMappingRule(String newActorPath, String oldActorName, String dispatchName) { + ActorConfig actorConfig = new ActorConfig() + .setActorName(oldActorName) + .setDispatcherName(dispatchName == null ? DEFAULT_DISPATCH_NAME : dispatchName); + RP_2_ACTOR_CFG.put(newActorPath, actorConfig); + } } diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java index 4fcc3373..0151fd14 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaTransporter.java @@ -3,22 +3,17 @@ package tech.powerjob.remote.akka; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.pattern.Patterns; -import com.google.common.collect.Maps; import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.remote.framework.base.HandlerLocation; import tech.powerjob.remote.framework.base.RemotingException; -import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.transporter.Protocol; import tech.powerjob.remote.framework.transporter.Transporter; import java.time.Duration; -import java.util.Map; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutorService; /** * AkkaTransporter @@ -61,7 +56,7 @@ public class AkkaTransporter implements Transporter { private ActorSelection fetchActorSelection(URL url) { HandlerLocation location = url.getLocation(); - String targetActorSystemName = AkkaConstant.fetchActorSystemName(location.getServerType()); + String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType()); String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName(); diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/logback-dev.xml b/powerjob-server/powerjob-server-starter/src/main/resources/logback-dev.xml index 1c58dc7c..9951c3bc 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/logback-dev.xml +++ b/powerjob-server/powerjob-server-starter/src/main/resources/logback-dev.xml @@ -12,6 +12,8 @@ + + diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 73cbd036..5f1d8422 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -20,7 +20,10 @@ 5.9.1 1.2.9 + 4.2.1 + 4.2.1 + 4.2.1 @@ -32,13 +35,27 @@ ${spring.version} - + tech.powerjob powerjob-remote-framework ${powerjob-remote-framework.version} + + + tech.powerjob + powerjob-remote-impl-akka + ${powerjob-remote-impl-akka.version} + + + + tech.powerjob + powerjob-remote-impl-http + ${powerjob-remote-impl-http.version} + provided + + com.h2database diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index e32ecec8..ca37043c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -33,7 +33,6 @@ import tech.powerjob.worker.common.utils.SpringUtils; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.persistence.TaskPersistenceService; -import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,7 +48,7 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean private final WorkerRuntime workerRuntime = new WorkerRuntime(); - private RemoteEngine remoteEngine; + private final RemoteEngine remoteEngine = new PowerJobRemoteEngine(); private final AtomicBoolean initialized = new AtomicBoolean(); @Override @@ -96,14 +95,18 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig()); workerRuntime.setExecutorManager(executorManager); - // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了) + // 初始化 actor + TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime); + ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime); + WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor); + + // 初始化通讯引擎 EngineConfig engineConfig = new EngineConfig() - .setType("") + .setType(config.getProtocol().name()) .setServerType(ServerType.WORKER) .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort())) - .setActorList(Lists.newArrayList()); + .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); - remoteEngine = new PowerJobRemoteEngine(); EngineOutput engineOutput = remoteEngine.start(engineConfig); workerRuntime.setTransporter(engineOutput.getTransporter()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/WorkerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/WorkerActor.java index 6288fd34..052b2a4a 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/WorkerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/WorkerActor.java @@ -5,6 +5,7 @@ import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; +import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.container.OmsContainerFactory; import static tech.powerjob.common.RemoteConstant.*; @@ -19,9 +20,12 @@ import static tech.powerjob.common.RemoteConstant.*; @Actor(path = WORKER_PATH) public class WorkerActor { + private final WorkerRuntime workerRuntime; private final TaskTrackerActor taskTrackerActor; - public WorkerActor(TaskTrackerActor taskTrackerActor) { + + public WorkerActor(WorkerRuntime workerRuntime, TaskTrackerActor taskTrackerActor) { + this.workerRuntime = workerRuntime; this.taskTrackerActor = taskTrackerActor; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java index 961aacec..a29de72a 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import lombok.Getter; import lombok.Setter; import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.enums.Protocol; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.WorkflowContext; @@ -35,6 +36,10 @@ public class PowerJobWorkerConfig { * Do not mistake for ActorSystem port. Do not add any prefix, i.e. http://. */ private List serverAddress = Lists.newArrayList(); + /** + * Protocol for communication between WORKER and server + */ + private Protocol protocol = Protocol.AKKA; /** * Max length of response result. Result that is longer than the value will be truncated. * {@link ProcessResult} max length for #msg diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java index 0bc7ea08..b9c4b287 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java @@ -106,10 +106,10 @@ public class TransportUtils { public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) { HandlerLocation handlerLocation = new HandlerLocation() - .setServerType(serverType) .setRootPath(rootPath) .setMethodPath(handlerPath); return new URL() + .setServerType(serverType) .setAddress(Address.fromIpv4(address)) .setLocation(handlerLocation); }