mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
refactor: Rename Task_TRACKER_ACTOR_NAME
This commit is contained in:
parent
65e0d118c3
commit
d07ed2b013
@ -14,7 +14,7 @@ public class RemoteConstant {
|
|||||||
|
|
||||||
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
|
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
|
||||||
|
|
||||||
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
|
public static final String TASK_TRACKER_ACTOR_NAME = "task_tracker";
|
||||||
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
||||||
public static final String WORKER_ACTOR_NAME = "worker";
|
public static final String WORKER_ACTOR_NAME = "worker";
|
||||||
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
|
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
|
||||||
|
@ -90,7 +90,7 @@ public class AkkaStarter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ActorSelection getTaskTrackerActor(String address) {
|
public static ActorSelection getTaskTrackerActor(String address) {
|
||||||
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
return actorSystem.actorSelection(path);
|
return actorSystem.actorSelection(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
|
|||||||
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
||||||
actorSystem.actorOf(Props.create(TaskTrackerActor.class)
|
actorSystem.actorOf(Props.create(TaskTrackerActor.class)
|
||||||
.withDispatcher("akka.task-tracker-dispatcher")
|
.withDispatcher("akka.task-tracker-dispatcher")
|
||||||
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)
|
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)
|
||||||
.withDispatcher("akka.processor-tracker-dispatcher")
|
.withDispatcher("akka.processor-tracker-dispatcher")
|
||||||
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||||
|
@ -46,7 +46,7 @@ public abstract class MapProcessor implements BasicProcessor {
|
|||||||
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
|
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
|
||||||
|
|
||||||
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
|
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req);
|
boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req);
|
||||||
|
|
||||||
if (requestSucceed) {
|
if (requestSucceed) {
|
||||||
|
@ -84,7 +84,7 @@ public class ProcessorTracker {
|
|||||||
this.instanceInfo = request.getInstanceInfo();
|
this.instanceInfo = request.getInstanceInfo();
|
||||||
this.instanceId = request.getInstanceInfo().getInstanceId();
|
this.instanceId = request.getInstanceInfo().getInstanceId();
|
||||||
this.taskTrackerAddress = request.getTaskTrackerAddress();
|
this.taskTrackerAddress = request.getTaskTrackerAddress();
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
|
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
|
||||||
|
|
||||||
this.omsLogger = new OmsServerLogger(instanceId);
|
this.omsLogger = new OmsServerLogger(instanceId);
|
||||||
|
@ -37,7 +37,7 @@ public class CommonTaskTrackerTest {
|
|||||||
worker.init();
|
worker.init();
|
||||||
|
|
||||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ public class CommonTest {
|
|||||||
String address = NetUtils.getLocalHost() + ":27777";
|
String address = NetUtils.getLocalHost() + ":27777";
|
||||||
|
|
||||||
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
|
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
|
||||||
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME));
|
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.TASK_TRACKER_ACTOR_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
|
@ -35,7 +35,7 @@ public class FrequentTaskTrackerTest {
|
|||||||
worker.init();
|
worker.init();
|
||||||
|
|
||||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user