From a6482edd38b28ade4dc995a2a9b89abca4427c85 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 24 Mar 2020 20:29:06 +0800 Subject: [PATCH] fix remote bug -> akka remote path is akka://%s@%s:%d/user/%s --- .../github/kfcfans/oms/worker/OhMyWorker.java | 39 ++++------- .../worker/common/constants/AkkaConstant.java | 5 ++ .../oms/worker/common/utils/AkkaUtils.java | 6 +- .../pojo/request/TaskTrackerStartTaskReq.java | 13 +++- .../main/resources/oms-akka-application.conf | 1 + .../src/main/resources/oms-logback.xml | 23 ++++++ .../kfcfans/oms/ProcessorTrackerTest.java | 70 +++++++++++++++++++ .../com/github/kfcfans/oms/UtilsTest.java | 18 +++++ .../oms/processors/TestBasicProcessor.java | 21 ++++++ .../src/test/resources/oms-akka-test.conf | 14 ++++ 10 files changed, 180 insertions(+), 30 deletions(-) create mode 100644 oh-my-scheduler-worker/src/main/resources/oms-logback.xml create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/UtilsTest.java create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java create mode 100644 oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index c616d58f..8992f13c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; @@ -13,6 +14,8 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; @@ -31,9 +34,12 @@ import java.util.Map; @Slf4j public class OhMyWorker implements ApplicationContextAware, InitializingBean { + @Getter private static OhMyConfig config; public static ActorSystem actorSystem; + public static ActorRef processorTracker; + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringUtils.inject(applicationContext); @@ -54,17 +60,17 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { // 初始化 ActorSystem Map overrideConfig = Maps.newHashMap(); String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP(); + int port = config.getListeningPort() == null ? AkkaConstant.DEFAULT_PORT : config.getListeningPort(); overrideConfig.put("akka.remote.artery.canonical.hostname", localIP); - if (config.getListeningPort() != null) { - overrideConfig.put("akka.remote.artery.canonical.port", config.getListeningPort()); - } - log.info("[OhMyWorker] akka-remote listening address config: {}", overrideConfig); + overrideConfig.put("akka.remote.artery.canonical.port", port); + log.info("[OhMyWorker] akka-remote listening address: {}:{}", localIP, port); + Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class)); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)); + actorSystem.actorOf(Props.create(TaskTrackerActor.class), AkkaConstant.Task_TRACKER_ACTOR_NAME); + processorTracker = actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); // 初始化存储 @@ -76,26 +82,9 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { }catch (Exception e) { log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e); } - } - public static OhMyConfig getConfig() { - return config; - } - public void setConfig(OhMyConfig cfg) { - config = cfg; - } - - public static void main(String[] args) { - - System.out.println(org.h2.util.NetUtils.getLocalAddress()); - - OhMyConfig config = new OhMyConfig(); - config.setAppName("oms"); - OhMyWorker ohMyWorker = new OhMyWorker(); - ohMyWorker.setConfig(config); - ohMyWorker.init(); - - + public void setConfig(OhMyConfig config) { + OhMyWorker.config = config; } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java index dacdf97c..61a8bf4c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java @@ -8,6 +8,11 @@ package com.github.kfcfans.oms.worker.common.constants; */ public class AkkaConstant { + /** + * 默认端口 + */ + public static final int DEFAULT_PORT = 25520; + /** * 顶层Actor(actorSystem名称) */ diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java index 6099643a..66dfb88f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java @@ -14,10 +14,12 @@ public class AkkaUtils { /** * akka://@:/ */ - private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/%s"; + private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/user/%s"; public static String getAkkaRemotePath(String ip, String actorName) { - return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, OhMyWorker.getConfig().getListeningPort(), actorName); + Integer configPort = OhMyWorker.getConfig().getListeningPort(); + int port = configPort == null ? AkkaConstant.DEFAULT_PORT : configPort; + return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, port, actorName); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java index a363c92a..f8ffc6a7 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -4,7 +4,11 @@ import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; import lombok.Data; +import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; /** * JobTracker 派发 task 进行执行 @@ -12,9 +16,9 @@ import lombok.NoArgsConstructor; * @author tjq * @since 2020/3/17 */ -@Data -@NoArgsConstructor -public class TaskTrackerStartTaskReq { +@Getter +@Setter +public class TaskTrackerStartTaskReq implements Serializable { private String jobId; private String instanceId; @@ -42,6 +46,9 @@ public class TaskTrackerStartTaskReq { // 子任务当前重试次数 private int currentRetryTimes; + public TaskTrackerStartTaskReq() { + } + public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) { jobId = instanceInfo.getJobId(); instanceId = instanceInfo.getInstanceId(); diff --git a/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf b/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf index 7972a078..ee7f4434 100644 --- a/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf +++ b/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf @@ -2,6 +2,7 @@ akka { actor { # cluster is better(recommend by official document), but I prefer remote provider = remote + allow-java-serialization = on } remote { artery { diff --git a/oh-my-scheduler-worker/src/main/resources/oms-logback.xml b/oh-my-scheduler-worker/src/main/resources/oms-logback.xml new file mode 100644 index 00000000..16c12a08 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/resources/oms-logback.xml @@ -0,0 +1,23 @@ + + + + + + + + %d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n + + UTF-8 + + + + + + + + + + + + + diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java new file mode 100644 index 00000000..2c5a40f1 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java @@ -0,0 +1,70 @@ +package com.github.kfcfans.oms; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.OhMyConfig; +import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; +import com.typesafe.config.ConfigFactory; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + + +/** + * 测试任务的启动 + * + * @author tjq + * @since 2020/3/24 + */ +public class ProcessorTrackerTest { + + @BeforeAll + public static void startWorker() throws Exception { + OhMyConfig ohMyConfig = new OhMyConfig(); + ohMyConfig.setAppName("oms-test"); + OhMyWorker worker = new OhMyWorker(); + worker.setConfig(ohMyConfig); + worker.init(); + } + + @AfterAll + public static void stop() throws Exception { + Thread.sleep(120000); + } + + @Test + public void testProcessor() throws Exception { + + + ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); + ActorSelection remoteActor = testAS.actorSelection(akkaRemotePath); + + TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(); + req.setJobId("1"); + req.setInstanceId("10086"); + req.setTaskId("0"); + req.setTaskName("ROOT_TASK"); + req.setMaxRetryTimes(3); + req.setCurrentRetryTimes(0); + + req.setExecuteType(ExecuteType.STANDALONE.name()); + req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor"); + req.setThreadConcurrency(5); + req.setTaskTrackerAddress("192.168.1.2"); + req.setJobTimeLimitMS(123132); + + remoteActor.tell(req, null); + + Thread.sleep(120000); + + } + +} diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/UtilsTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/UtilsTest.java new file mode 100644 index 00000000..2103fff2 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/UtilsTest.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.oms; + +import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import org.junit.jupiter.api.Test; + +/** + * 测试工具类 + * + * @author tjq + * @since 2020/3/24 + */ +public class UtilsTest { + + @Test + public void testNetUtils() throws Exception { + System.out.println("本机IP:" + NetUtils.getLocalHost()); + } +} diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java new file mode 100644 index 00000000..55d24878 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.oms.processors; + +import com.github.kfcfans.oms.worker.sdk.ProcessResult; +import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; + +/** + * 测试用的基础处理器 + * + * @author tjq + * @since 2020/3/24 + */ +public class TestBasicProcessor implements BasicProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + System.out.println("==== ProcessResult#process"); + System.out.println("TaskContext: " + context.toString()); + return new ProcessResult(true, "success"); + } +} diff --git a/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf b/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf new file mode 100644 index 00000000..c258dfbe --- /dev/null +++ b/oh-my-scheduler-worker/src/test/resources/oms-akka-test.conf @@ -0,0 +1,14 @@ +akka { + actor { + # for test + provider = remote + allow-java-serialization = on + } + remote { + artery { + transport = tcp # See Selecting a transport below + canonical.hostname = "127.0.0.1" + canonical.port = 25521 + } + } +} \ No newline at end of file