diff --git a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java index dbc6c8a4..bfd4246a 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java @@ -33,4 +33,47 @@ public class RemoteConstant { /* ************************ OTHERS ************************ */ public static final String EMPTY_ADDRESS = "N/A"; public static final long DEFAULT_TIMEOUT_MS = 5000; + + /* ************************ SERVER ************************ */ + public static final String SERVER_PATH = "server"; + public static final String SERVER_HANDLER_REPORT_LOG = "reportLog"; + + /* ************************ Worker-TaskTracker ************************ */ + public static final String WTT_PATH = "taskTracker"; + + /** + * server 任务执行命令 + */ + public static final String WTT_HANDLER_RUN_JOB = "runJob"; + /** + * server 停止任务实例命令 + */ + public static final String WTT_HANDLER_STOP_INSTANCE = "stopInstance"; + + /** + * sever 查询任务状态 + */ + public static final String WTT_HANDLER_QUERY_INSTANCE_STATUS = "queryInstanceStatus"; + + /** + * PT 上报任务状态,包含执行结果 + */ + public static final String WTT_HANDLER_REPORT_TASK_STATUS = "reportTaskStatus"; + /** + * PT 上报自身状态 + */ + public static final String WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS = "reportProcessorTrackerStatus"; + + /** + * Map 任务 + */ + public static final String WTT_HANDLER_MAP_TASK = "mapTask"; + + /* ************************ Worker-ProcessorTracker ************************ */ + public static final String WPT_PATH = "processorTracker"; + + public static final String WPT_HANDLER_START_TASK = "startTask"; + + public static final String WPT_HANDLER_STOP_INSTANCE = "stopInstance"; + } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java index 71edbc36..3a960b96 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/base/Address.java @@ -22,4 +22,19 @@ public class Address implements Serializable { public String toFullAddress() { return String.format("%s:%d", host, port); } + + public static Address fromIpv4(String ipv4) { + String[] split = ipv4.split(":"); + return new Address() + .setHost(split[0]) + .setPort(Integer.parseInt(split[1])); + } + + @Override + public String toString() { + return "Address{" + + "host='" + host + '\'' + + ", port=" + port + + '}'; + } } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java index dbba502b..a591f2a9 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java @@ -9,4 +9,6 @@ package tech.powerjob.remote.framework.engine; public interface RemoteEngine { EngineOutput start(EngineConfig engineConfig); + + void close(); } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index 8fdd9c28..5292d4e7 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -51,4 +51,9 @@ public class PowerJobRemoteEngine implements RemoteEngine { return engineOutput; } + + @Override + public void close() { + + } } diff --git a/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/base/AddressTest.java b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/base/AddressTest.java new file mode 100644 index 00000000..f209de67 --- /dev/null +++ b/powerjob-remote/powerjob-remote-framework/src/test/java/tech/powerjob/remote/framework/base/AddressTest.java @@ -0,0 +1,24 @@ +package tech.powerjob.remote.framework.base; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * test address + * + * @author tjq + * @since 2023/1/20 + */ +@Slf4j +class AddressTest { + + @Test + void testAddress() { + String ip = "192.168.1.1:10085"; + final Address address = Address.fromIpv4(ip); + log.info("[AddressTest] parse address: {}", address); + assert ip.equals(address.toFullAddress()); + } +} \ No newline at end of file diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index be2e8535..73cbd036 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -15,12 +15,12 @@ 5.3.23 - 4.2.1 2.1.214 4.0.3 5.9.1 1.2.9 + 4.2.1 @@ -35,8 +35,8 @@ tech.powerjob - powerjob-common - ${powerjob.common.version} + powerjob-remote-framework + ${powerjob-remote-framework.version} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index ef88197f..15130e53 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -6,6 +6,7 @@ import akka.actor.DeadLetter; import akka.actor.Props; import akka.routing.RoundRobinPool; import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -22,9 +23,14 @@ import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.HttpUtils; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.EngineOutput; +import tech.powerjob.remote.framework.engine.RemoteEngine; +import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.worker.actors.ProcessorTrackerActor; import tech.powerjob.worker.actors.TaskTrackerActor; -import tech.powerjob.worker.actors.TroubleshootingActor; import tech.powerjob.worker.actors.WorkerActor; import tech.powerjob.worker.background.OmsLogHandler; import tech.powerjob.worker.background.ServerDiscoveryService; @@ -52,6 +58,7 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean private final WorkerRuntime workerRuntime = new WorkerRuntime(); + private RemoteEngine remoteEngine; private final AtomicBoolean initialized = new AtomicBoolean(); @Override @@ -99,16 +106,15 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean workerRuntime.setExecutorManager(executorManager); // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了) - Map overrideConfig = Maps.newHashMap(); - overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost()); - overrideConfig.put("akka.remote.artery.canonical.port", config.getPort()); + EngineConfig engineConfig = new EngineConfig() + .setType("") + .setServerType(ServerType.WORKER) + .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort())) + .setActorList(Lists.newArrayList()); - Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); - Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); - - int cores = Runtime.getRuntime().availableProcessors(); - ActorSystem actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - workerRuntime.setActorSystem(actorSystem); + remoteEngine = new PowerJobRemoteEngine(); + EngineOutput engineOutput = remoteEngine.start(engineConfig); + workerRuntime.setTransporter(engineOutput.getTransporter()); // 连接 server ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig()); @@ -116,25 +122,10 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor()); workerRuntime.setServerDiscoveryService(serverDiscoveryService); - ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(workerRuntime) - .withDispatcher("akka.task-tracker-dispatcher") - .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.TASK_TRACKER_ACTOR_NAME); - actorSystem.actorOf(ProcessorTrackerActor.props(workerRuntime) - .withDispatcher("akka.processor-tracker-dispatcher") - .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - actorSystem.actorOf(WorkerActor.props(taskTrackerActorRef) - .withDispatcher("akka.worker-common-dispatcher") - .withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME); - - // 处理系统中产生的异常情况 - ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME); - actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); - - log.info("[PowerJobWorker] akka-remote listening address: {}", workerAddress); - log.info("[PowerJobWorker] akka ActorSystem({}) initialized successfully.", actorSystem); + log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully."); // 初始化日志系统 - OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService); + OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService); workerRuntime.setOmsLogHandler(omsLogHandler); // 初始化存储 @@ -194,6 +185,6 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean @Override public void destroy() throws Exception { workerRuntime.getExecutorManager().shutdown(); - workerRuntime.getActorSystem().terminate(); + remoteEngine.close(); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java index 3c0bffbc..bcc76dd4 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java @@ -2,6 +2,10 @@ package tech.powerjob.worker.actors; import akka.actor.AbstractActor; import akka.actor.Props; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.remote.framework.actor.Actor; +import tech.powerjob.remote.framework.actor.Handler; +import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.core.tracker.processor.ProcessorTracker; import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager; @@ -21,29 +25,21 @@ import java.util.List; * @since 2020/3/17 */ @Slf4j -@AllArgsConstructor -public class ProcessorTrackerActor extends AbstractActor { +@Actor(path = RemoteConstant.WPT_PATH) +public class ProcessorTrackerActor { private final WorkerRuntime workerRuntime; - public static Props props(WorkerRuntime workerRuntime) { - return Props.create(ProcessorTrackerActor.class, () -> new ProcessorTrackerActor(workerRuntime)); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match(TaskTrackerStartTaskReq.class, this::onReceiveTaskTrackerStartTaskReq) - .match(TaskTrackerStopInstanceReq.class, this::onReceiveTaskTrackerStopInstanceReq) - .matchAny(obj -> log.warn("[ProcessorTrackerActor] receive unknown request: {}.", obj)) - .build(); + public ProcessorTrackerActor(WorkerRuntime workerRuntime) { + this.workerRuntime = workerRuntime; } /** * 处理来自TaskTracker的task执行请求 * @param req 请求 */ - private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { + @Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING) + public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { Long instanceId = req.getInstanceInfo().getInstanceId(); @@ -68,7 +64,8 @@ public class ProcessorTrackerActor extends AbstractActor { * 处理来自TaskTracker停止任务的请求 * @param req 请求 */ - private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { + @Handler(path = RemoteConstant.WPT_HANDLER_STOP_INSTANCE) + public void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { Long instanceId = req.getInstanceId(); List removedPts = ProcessorTrackerManager.removeProcessorTracker(instanceId); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java index c95a585b..8b920b1c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java @@ -5,6 +5,7 @@ import akka.actor.Props; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.InstanceDetail; @@ -12,6 +13,8 @@ import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.actor.Actor; +import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; @@ -26,6 +29,8 @@ import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import java.util.List; +import static tech.powerjob.common.RemoteConstant.*; + /** * worker 的 master 节点,处理来自 server 的 jobInstance 请求和来自 worker 的task 请求 * @@ -33,47 +38,33 @@ import java.util.List; * @since 2020/3/17 */ @Slf4j -@AllArgsConstructor -public class TaskTrackerActor extends AbstractActor { +@Actor(path = WTT_PATH) +public class TaskTrackerActor { private final WorkerRuntime workerRuntime; - public static Props props(WorkerRuntime workerRuntime) { - return Props.create(TaskTrackerActor.class, () -> new TaskTrackerActor(workerRuntime)); + public TaskTrackerActor(WorkerRuntime workerRuntime) { + this.workerRuntime = workerRuntime; } - @Override - public Receive createReceive() { - return receiveBuilder() - .match(ProcessorReportTaskStatusReq.class, this::onReceiveProcessorReportTaskStatusReq) - .match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq) - .match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest) - .match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq) - .match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq) - .match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq) - .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) - .build(); - } - - /** * 子任务状态上报 处理器 */ - private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { + @Handler(path = WTT_HANDLER_REPORT_TASK_STATUS) + public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { int taskStatus = req.getStatus(); // 只有重量级任务才会有两级任务状态上报的机制 HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); // 结束状态需要回复接受成功 if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) { - AskResponse askResponse = AskResponse.succeed(null); - getSender().tell(askResponse, getSelf()); + return AskResponse.succeed(null); } // 手动停止 TaskTracker 的情况下会出现这种情况 if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); - return; + return null; } if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) { @@ -84,17 +75,20 @@ public class TaskTrackerActor extends AbstractActor { // 更新工作流上下文 taskTracker.updateAppendedWfContext(req.getAppendedWfContext()); + + return null; } /** * 子任务 map 处理器 */ - private void onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) { + @Handler(path = WTT_HANDLER_MAP_TASK) + public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req); - return; + return null; } boolean success = false; @@ -121,13 +115,14 @@ public class TaskTrackerActor extends AbstractActor { AskResponse response = new AskResponse(); response.setSuccess(success); - getSender().tell(response, getSelf()); + return response; } /** * 服务器任务调度处理器 */ - private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { + @Handler(path = WTT_HANDLER_RUN_JOB) + public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { log.debug("[TaskTrackerActor] server schedule job by request: {}.", req); Long instanceId = req.getInstanceId(); // 区分轻量级任务模型以及重量级任务模型 @@ -168,7 +163,8 @@ public class TaskTrackerActor extends AbstractActor { /** * ProcessorTracker 心跳处理器 */ - private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) { + @Handler(path = WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS) + public void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null) { @@ -181,8 +177,8 @@ public class TaskTrackerActor extends AbstractActor { /** * 停止任务实例 */ - private void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) { - + @Handler(path = WTT_HANDLER_STOP_INSTANCE) + public void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) { log.info("[TaskTrackerActor] receive ServerStopInstanceReq({}).", req); HeavyTaskTracker heavyTaskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); @@ -201,7 +197,8 @@ public class TaskTrackerActor extends AbstractActor { /** * 查询任务实例运行状态 */ - private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { + @Handler(path = WTT_HANDLER_QUERY_INSTANCE_STATUS) + public AskResponse onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { AskResponse askResponse; TaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null && (taskTracker = LightTaskTrackerManager.getTaskTracker(req.getInstanceId())) == null) { @@ -211,7 +208,7 @@ public class TaskTrackerActor extends AbstractActor { InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(); askResponse = AskResponse.succeed(instanceDetail); } - getSender().tell(askResponse, getSelf()); + return askResponse; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TroubleshootingActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TroubleshootingActor.java deleted file mode 100644 index 1996f7cd..00000000 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TroubleshootingActor.java +++ /dev/null @@ -1,25 +0,0 @@ -package tech.powerjob.worker.actors; - -import akka.actor.AbstractActor; -import akka.actor.DeadLetter; -import lombok.extern.slf4j.Slf4j; - -/** - * 处理系统异常的 Actor - * - * @author 朱八 - * @since 2020/7/16 - */ -@Slf4j -public class TroubleshootingActor extends AbstractActor { - @Override - public Receive createReceive() { - return receiveBuilder() - .match(DeadLetter.class, this::onReceiveDeadLetter) - .build(); - } - - public void onReceiveDeadLetter(DeadLetter dl) { - log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl); - } -} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java index 54010994..5ea219cc 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java @@ -1,15 +1,14 @@ package tech.powerjob.worker.background; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; import tech.powerjob.common.enums.LogLevel; import tech.powerjob.common.model.InstanceLogContent; import tech.powerjob.common.request.WorkerLogReportReq; -import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.remote.framework.transporter.Transporter; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -27,7 +26,7 @@ import java.util.concurrent.locks.ReentrantLock; public class OmsLogHandler { private final String workerAddress; - private final ActorSystem actorSystem; + private final Transporter transporter; private final ServerDiscoveryService serverDiscoveryService; // 处理线程,需要通过线程池启动 @@ -42,9 +41,9 @@ public class OmsLogHandler { // 本地囤积阈值 private static final int REPORT_SIZE = 1024; - public OmsLogHandler(String workerAddress, ActorSystem actorSystem, ServerDiscoveryService serverDiscoveryService) { + public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) { this.workerAddress = workerAddress; - this.actorSystem = actorSystem; + this.transporter = transporter; this.serverDiscoveryService = serverDiscoveryService; } @@ -81,9 +80,9 @@ public class OmsLogHandler { try { - String serverPath = AkkaUtils.getServerActorPath(serverDiscoveryService.getCurrentServerAddress()); + final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress(); // 当前无可用 Server - if (StringUtils.isEmpty(serverPath)) { + if (StringUtils.isEmpty(currentServerAddress)) { if (!logQueue.isEmpty()) { logQueue.clear(); log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs."); @@ -91,7 +90,6 @@ public class OmsLogHandler { return; } - ActorSelection serverActor = actorSystem.actorSelection(serverPath); List logs = Lists.newLinkedList(); while (!logQueue.isEmpty()) { @@ -102,7 +100,7 @@ public class OmsLogHandler { if (logs.size() >= BATCH_SIZE) { WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs)); // 不可靠请求,WEB日志不追求极致 - serverActor.tell(req, null); + TransportUtils.reportLogs(req, currentServerAddress, transporter); logs.clear(); } @@ -113,7 +111,7 @@ public class OmsLogHandler { if (!logs.isEmpty()) { WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs); - serverActor.tell(req, null); + TransportUtils.reportLogs(req, currentServerAddress, transporter); } }finally { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java index f7b07c41..5e877e3b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java @@ -1,6 +1,6 @@ package tech.powerjob.worker.common; -import akka.actor.ActorSystem; +import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.worker.background.OmsLogHandler; import tech.powerjob.worker.background.ServerDiscoveryService; import tech.powerjob.worker.background.WorkerHealthReporter; @@ -23,7 +23,7 @@ public class WorkerRuntime { private PowerJobWorkerConfig workerConfig; - private ActorSystem actorSystem; + private Transporter transporter; private WorkerHealthReporter healthReporter; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java new file mode 100644 index 00000000..457d31e9 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java @@ -0,0 +1,86 @@ +package tech.powerjob.worker.common.utils; + +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.exception.PowerJobCheckedException; +import tech.powerjob.common.request.WorkerLogReportReq; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.HandlerLocation; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.transporter.Transporter; +import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; +import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; +import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static tech.powerjob.common.RemoteConstant.*; + +/** + * 通讯工具 + * + * @author tjq + * @since 2023/1/20 + */ +@Slf4j +public class TransportUtils { + + public static void ptReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) { + final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address); + workerRuntime.getTransporter().tell(url, req); + } + + public static void ptReportSelfStatus(ProcessorTrackerStatusReportReq req, String address, WorkerRuntime workerRuntime) { + final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS, address); + workerRuntime.getTransporter().tell(url, req); + } + + public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) { + final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_LOG, address); + transporter.tell(url, req); + } + + public static boolean reliablePtReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) { + try { + final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address); + final CompletionStage completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class); + return completionStage + .toCompletableFuture() + .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .isSuccess(); + + } catch (Exception e) { + log.warn("[PowerJobTransport] reliablePtReportTask failed: {}", req, e); + return false; + } + } + + public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException { + try { + final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address); + final CompletionStage completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class); + return completionStage + .toCompletableFuture() + .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .isSuccess(); + + } catch (Throwable throwable) { + throw new PowerJobCheckedException(throwable); + } + } + + public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) { + HandlerLocation handlerLocation = new HandlerLocation() + .setServerType(serverType) + .setRootPath(rootPath) + .setMethodPath(handlerPath); + return new URL() + .setAddress(Address.fromIpv4(address)) + .setLocation(handlerLocation); + } + +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java index d42ed61b..454895f3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java @@ -1,6 +1,5 @@ package tech.powerjob.worker.core.processor.runnable; -import akka.actor.ActorSelection; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.ThreadLocalStore; @@ -8,6 +7,7 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.common.serialize.SerializerUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.common.utils.WorkflowContextUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; @@ -45,7 +45,7 @@ public class HeavyProcessorRunnable implements Runnable { private final InstanceInfo instanceInfo; - private final ActorSelection taskTrackerActor; + private final String taskTrackerAddress; private final TaskDO task; private final BasicProcessor processor; private final OmsLogger omsLogger; @@ -222,14 +222,14 @@ public class HeavyProcessorRunnable implements Runnable { // 最终结束状态要求可靠发送 if (TaskStatus.FINISHED_STATUS.contains(status.getValue())) { - boolean success = AkkaUtils.reliableTransmit(taskTrackerActor, req); + boolean success = TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime); if (!success) { // 插入重试队列,等待重试 statusReportRetryQueue.add(req); log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", task.getInstanceId(), task.getTaskId(), status, result); } } else { - taskTrackerActor.tell(req, null); + TransportUtils.ptReportTask(req, taskTrackerAddress, workerRuntime); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java index d8d8e752..3cea7976 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -9,6 +9,7 @@ import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; @@ -55,8 +56,7 @@ public interface MapProcessor extends BasicProcessor { ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.TASK_TRACKER_ACTOR_NAME); - boolean requestSucceed = AkkaUtils.reliableTransmit(workerRuntime.getActorSystem().actorSelection(akkaRemotePath), req); + boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime); if (requestSucceed) { log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index f3a54543..05ebe598 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -1,6 +1,5 @@ package tech.powerjob.worker.core.tracker.processor; -import akka.actor.ActorSelection; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; @@ -16,6 +15,7 @@ import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.ProcessorInfo; import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable; import tech.powerjob.worker.core.processor.ProcessorLoader; @@ -76,8 +76,6 @@ public class ProcessorTracker { private String taskTrackerAddress; - private ActorSelection taskTrackerActorRef; - private ThreadPoolExecutor threadPool; private ScheduledExecutorService timingPool; @@ -107,9 +105,6 @@ public class ProcessorTracker { this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME); - this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath); - this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; @@ -152,7 +147,7 @@ public class ProcessorTracker { .setResult(lethalReason) .setReportTime(System.currentTimeMillis()); - taskTrackerActorRef.tell(report, null); + TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime); return; } @@ -162,7 +157,7 @@ public class ProcessorTracker { newTask.setAddress(taskTrackerAddress); ClassLoader classLoader = processorInfo.getClassLoader(); - HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processorInfo.getBasicProcessor(), omsLogger, classLoader, statusReportRetryQueue, workerRuntime); + HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorInfo.getBasicProcessor(), omsLogger, classLoader, statusReportRetryQueue, workerRuntime); try { threadPool.submit(heavyProcessorRunnable); success = true; @@ -182,7 +177,7 @@ public class ProcessorTracker { reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setReportTime(System.currentTimeMillis()); - taskTrackerActorRef.tell(reportReq, null); + TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime); log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); @@ -203,7 +198,6 @@ public class ProcessorTracker { }); // 2. 去除顶层引用,送入GC世界 - taskTrackerActorRef = null; statusReportRetryQueue.clear(); ProcessorTrackerManager.removeProcessorTracker(instanceId); @@ -281,7 +275,7 @@ public class ProcessorTracker { // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受 ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId); statusReportReq.setAddress(workerRuntime.getWorkerAddress()); - taskTrackerActorRef.tell(statusReportReq, null); + TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime); destroy(); return; } @@ -293,7 +287,7 @@ public class ProcessorTracker { ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll(); if (req != null) { req.setReportTime(System.currentTimeMillis()); - if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) { + if (!TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime)) { statusReportRetryQueue.add(req); log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", instanceId, req); return; @@ -305,7 +299,7 @@ public class ProcessorTracker { long waitingNum = threadPool.getQueue().size(); ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum); statusReportReq.setAddress(workerRuntime.getWorkerAddress()); - taskTrackerActorRef.tell(statusReportReq, null); + TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime); log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum); }