fix remote bug -> akka remote path is akka://%s@%s:%d/user/%s

This commit is contained in:
tjq 2020-03-24 20:29:06 +08:00
parent a18a5226a4
commit a6482edd38
10 changed files with 180 additions and 30 deletions

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
@ -13,6 +14,8 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
@ -31,9 +34,12 @@ import java.util.Map;
@Slf4j
public class OhMyWorker implements ApplicationContextAware, InitializingBean {
@Getter
private static OhMyConfig config;
public static ActorSystem actorSystem;
public static ActorRef processorTracker;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.inject(applicationContext);
@ -54,17 +60,17 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
// 初始化 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP();
int port = config.getListeningPort() == null ? AkkaConstant.DEFAULT_PORT : config.getListeningPort();
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
if (config.getListeningPort() != null) {
overrideConfig.put("akka.remote.artery.canonical.port", config.getListeningPort());
}
log.info("[OhMyWorker] akka-remote listening address config: {}", overrideConfig);
overrideConfig.put("akka.remote.artery.canonical.port", port);
log.info("[OhMyWorker] akka-remote listening address: {}:{}", localIP, port);
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class));
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class));
actorSystem.actorOf(Props.create(TaskTrackerActor.class), AkkaConstant.Task_TRACKER_ACTOR_NAME);
processorTracker = actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
// 初始化存储
@ -76,26 +82,9 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
}catch (Exception e) {
log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e);
}
}
public static OhMyConfig getConfig() {
return config;
}
public void setConfig(OhMyConfig cfg) {
config = cfg;
}
public static void main(String[] args) {
System.out.println(org.h2.util.NetUtils.getLocalAddress());
OhMyConfig config = new OhMyConfig();
config.setAppName("oms");
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config);
ohMyWorker.init();
public void setConfig(OhMyConfig config) {
OhMyWorker.config = config;
}
}

View File

@ -8,6 +8,11 @@ package com.github.kfcfans.oms.worker.common.constants;
*/
public class AkkaConstant {
/**
* 默认端口
*/
public static final int DEFAULT_PORT = 25520;
/**
* 顶层ActoractorSystem名称
*/

View File

@ -14,10 +14,12 @@ public class AkkaUtils {
/**
* akka://<actor system>@<hostname>:<port>/<actor path>
*/
private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/%s";
private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/user/%s";
public static String getAkkaRemotePath(String ip, String actorName) {
return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, OhMyWorker.getConfig().getListeningPort(), actorName);
Integer configPort = OhMyWorker.getConfig().getListeningPort();
int port = configPort == null ? AkkaConstant.DEFAULT_PORT : configPort;
return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, port, actorName);
}
}

View File

@ -4,7 +4,11 @@ import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
/**
* JobTracker 派发 task 进行执行
@ -12,9 +16,9 @@ import lombok.NoArgsConstructor;
* @author tjq
* @since 2020/3/17
*/
@Data
@NoArgsConstructor
public class TaskTrackerStartTaskReq {
@Getter
@Setter
public class TaskTrackerStartTaskReq implements Serializable {
private String jobId;
private String instanceId;
@ -42,6 +46,9 @@ public class TaskTrackerStartTaskReq {
// 子任务当前重试次数
private int currentRetryTimes;
public TaskTrackerStartTaskReq() {
}
public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) {
jobId = instanceInfo.getJobId();
instanceId = instanceInfo.getInstanceId();

View File

@ -2,6 +2,7 @@ akka {
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
allow-java-serialization = on
}
remote {
artery {

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
<!-- appender是configuration的子节点是负责写日志的组件。 -->
<!-- ConsoleAppender把日志输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n</pattern>
<!-- 控制台也要使用UTF-8不要使用GBK否则会中文乱码 -->
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="com.github.kfcfans.oms" level="INFO" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<!-- 控制台输出日志级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -0,0 +1,70 @@
package com.github.kfcfans.oms;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* 测试任务的启动
*
* @author tjq
* @since 2020/3/24
*/
public class ProcessorTrackerTest {
@BeforeAll
public static void startWorker() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();
}
@AfterAll
public static void stop() throws Exception {
Thread.sleep(120000);
}
@Test
public void testProcessor() throws Exception {
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection remoteActor = testAS.actorSelection(akkaRemotePath);
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
req.setJobId("1");
req.setInstanceId("10086");
req.setTaskId("0");
req.setTaskName("ROOT_TASK");
req.setMaxRetryTimes(3);
req.setCurrentRetryTimes(0);
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
req.setThreadConcurrency(5);
req.setTaskTrackerAddress("192.168.1.2");
req.setJobTimeLimitMS(123132);
remoteActor.tell(req, null);
Thread.sleep(120000);
}
}

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import org.junit.jupiter.api.Test;
/**
* 测试工具类
*
* @author tjq
* @since 2020/3/24
*/
public class UtilsTest {
@Test
public void testNetUtils() throws Exception {
System.out.println("本机IP" + NetUtils.getLocalHost());
}
}

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.oms.processors;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
/**
* 测试用的基础处理器
*
* @author tjq
* @since 2020/3/24
*/
public class TestBasicProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("==== ProcessResult#process");
System.out.println("TaskContext: " + context.toString());
return new ProcessResult(true, "success");
}
}

View File

@ -0,0 +1,14 @@
akka {
actor {
# for test
provider = remote
allow-java-serialization = on
}
remote {
artery {
transport = tcp # See Selecting a transport below
canonical.hostname = "127.0.0.1"
canonical.port = 25521
}
}
}