mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: merge official processor upgrade
This commit is contained in:
commit
f1baef7de4
@ -14,7 +14,7 @@ public class RemoteConstant {
|
|||||||
|
|
||||||
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
|
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
|
||||||
|
|
||||||
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
|
public static final String TASK_TRACKER_ACTOR_NAME = "task_tracker";
|
||||||
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
||||||
public static final String WORKER_ACTOR_NAME = "worker";
|
public static final String WORKER_ACTOR_NAME = "worker";
|
||||||
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
|
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package tech.powerjob.official.processors.impl;
|
package tech.powerjob.official.processors.impl;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.alibaba.fastjson.JSONValidator;
|
import com.alibaba.fastjson.JSONValidator;
|
||||||
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
|
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
|
||||||
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
|
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
|
||||||
@ -20,14 +21,16 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* common http processor
|
* common http processor
|
||||||
*
|
*
|
||||||
* @author tjq
|
* @author tjq
|
||||||
|
* @author Jiang Jining
|
||||||
* @since 2021/1/30
|
* @since 2021/1/30
|
||||||
*/
|
*/
|
||||||
public class HttpProcessor extends CommonBasicProcessor {
|
public class HttpProcessor extends CommonBasicProcessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 60 seconds
|
* Default timeout is 60 seconds.
|
||||||
*/
|
*/
|
||||||
private static final int DEFAULT_TIMEOUT = 60;
|
private static final int DEFAULT_TIMEOUT = 60;
|
||||||
|
private static final int HTTP_SUCCESS_CODE = 200;
|
||||||
private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();
|
private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -35,6 +38,12 @@ public class HttpProcessor extends CommonBasicProcessor {
|
|||||||
OmsLogger omsLogger = taskContext.getOmsLogger();
|
OmsLogger omsLogger = taskContext.getOmsLogger();
|
||||||
HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);
|
HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);
|
||||||
|
|
||||||
|
if (httpParams == null) {
|
||||||
|
String message = "httpParams is null, please check jobParam configuration.";
|
||||||
|
omsLogger.warn(message);
|
||||||
|
return new ProcessResult(false, message);
|
||||||
|
}
|
||||||
|
|
||||||
if (StringUtils.isEmpty(httpParams.url)) {
|
if (StringUtils.isEmpty(httpParams.url)) {
|
||||||
return new ProcessResult(false, "url can't be empty!");
|
return new ProcessResult(false, "url can't be empty!");
|
||||||
}
|
}
|
||||||
@ -54,10 +63,17 @@ public class HttpProcessor extends CommonBasicProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// set default mediaType
|
// set default mediaType
|
||||||
if (!"GET".equals(httpParams.method) && StringUtils.isEmpty(httpParams.mediaType) && JSONValidator.from(httpParams.body).validate()) {
|
if (!"GET".equals(httpParams.method)) {
|
||||||
|
// set default request body
|
||||||
|
if (StringUtils.isEmpty(httpParams.body)) {
|
||||||
|
httpParams.body = new JSONObject().toJSONString();
|
||||||
|
omsLogger.warn("try to use default request body:{}", httpParams.body);
|
||||||
|
}
|
||||||
|
if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) {
|
||||||
httpParams.mediaType = "application/json";
|
httpParams.mediaType = "application/json";
|
||||||
omsLogger.warn("try to use 'application/json' as media type");
|
omsLogger.warn("try to use 'application/json' as media type");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// set default timeout
|
// set default timeout
|
||||||
if (httpParams.timeout == null) {
|
if (httpParams.timeout == null) {
|
||||||
@ -95,9 +111,15 @@ public class HttpProcessor extends CommonBasicProcessor {
|
|||||||
msgBody = response.body().string();
|
msgBody = response.body().string();
|
||||||
}
|
}
|
||||||
|
|
||||||
String res = String.format("code:%d,body:%s", response.code(), msgBody);
|
int responseCode = response.code();
|
||||||
|
String res = String.format("code:%d, body:%s", responseCode, msgBody);
|
||||||
return new ProcessResult(true, res);
|
boolean success = true;
|
||||||
|
if (responseCode != HTTP_SUCCESS_CODE) {
|
||||||
|
success = false;
|
||||||
|
omsLogger.warn("{} url: {} failed, response code is {}, response body is {}",
|
||||||
|
httpParams.method, httpParams.url, responseCode, msgBody);
|
||||||
|
}
|
||||||
|
return new ProcessResult(success, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
@ -7,6 +7,8 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger;
|
|||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.SystemUtils;
|
||||||
import tech.powerjob.official.processors.CommonBasicProcessor;
|
import tech.powerjob.official.processors.CommonBasicProcessor;
|
||||||
import tech.powerjob.official.processors.util.CommonUtils;
|
import tech.powerjob.official.processors.util.CommonUtils;
|
||||||
|
|
||||||
@ -20,47 +22,62 @@ import java.util.concurrent.ForkJoinPool;
|
|||||||
* 脚本处理器
|
* 脚本处理器
|
||||||
*
|
*
|
||||||
* @author tjq
|
* @author tjq
|
||||||
|
* @author Jiang Jining
|
||||||
* @since 2020/4/16
|
* @since 2020/4/16
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
||||||
|
|
||||||
private static final ForkJoinPool pool = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
|
private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
|
||||||
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
|
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
|
||||||
|
static final String SH_SHELL = "/bin/sh";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ProcessResult process0(TaskContext context) throws Exception {
|
protected ProcessResult process0(TaskContext context) throws Exception {
|
||||||
OmsLogger omsLogger = context.getOmsLogger();
|
OmsLogger omsLogger = context.getOmsLogger();
|
||||||
omsLogger.info("SYSTEM ===> ScriptProcessor start to process");
|
omsLogger.info("SYSTEM ===> ScriptProcessor start to process");
|
||||||
|
String scriptParams = CommonUtils.parseParams(context);
|
||||||
|
if (scriptParams == null) {
|
||||||
|
String message = "scriptParams is null, please check jobParam configuration.";
|
||||||
|
omsLogger.warn(message);
|
||||||
|
return new ProcessResult(false, message);
|
||||||
|
}
|
||||||
|
String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams);
|
||||||
|
|
||||||
String scriptPath = prepareScriptFile(context.getInstanceId(), CommonUtils.parseParams(context));
|
if (SystemUtils.IS_OS_WINDOWS) {
|
||||||
|
if (StringUtils.equals(getRunCommand(), SH_SHELL)) {
|
||||||
|
String message = String.format("Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME);
|
||||||
|
omsLogger.warn(message);
|
||||||
|
return new ProcessResult(false, message);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
// 1. 授权
|
// 1. 授权
|
||||||
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
|
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
|
||||||
// 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
|
// 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
|
||||||
chmodPb.start().waitFor();
|
chmodPb.start().waitFor();
|
||||||
|
}
|
||||||
|
|
||||||
// 2. 执行目标脚本
|
// 2. 执行目标脚本
|
||||||
ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath);
|
ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath);
|
||||||
Process process = pb.start();
|
Process process = pb.start();
|
||||||
|
|
||||||
StringBuilder inputSB = new StringBuilder();
|
StringBuilder inputBuilder = new StringBuilder();
|
||||||
StringBuilder errorSB = new StringBuilder();
|
StringBuilder errorBuilder = new StringBuilder();
|
||||||
|
|
||||||
boolean success = true;
|
boolean success = true;
|
||||||
String result;
|
String result;
|
||||||
|
|
||||||
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
|
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
|
||||||
|
|
||||||
pool.execute(() -> copyStream(is, inputSB, omsLogger));
|
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger));
|
||||||
pool.execute(() -> copyStream(es, errorSB, omsLogger));
|
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger));
|
||||||
|
|
||||||
success = process.waitFor() == 0;
|
success = process.waitFor() == 0;
|
||||||
|
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
omsLogger.info("SYSTEM ===> ScriptProcessor has been interrupted");
|
omsLogger.info("SYSTEM ===> ScriptProcessor has been interrupted");
|
||||||
} finally {
|
} finally {
|
||||||
result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
|
result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString());
|
||||||
}
|
}
|
||||||
return new ProcessResult(success, result);
|
return new ProcessResult(success, result);
|
||||||
}
|
}
|
||||||
@ -112,6 +129,7 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 生成脚本名称
|
* 生成脚本名称
|
||||||
|
* @param instanceId id of instance
|
||||||
* @return 文件名称
|
* @return 文件名称
|
||||||
*/
|
*/
|
||||||
protected abstract String getScriptName(Long instanceId);
|
protected abstract String getScriptName(Long instanceId);
|
||||||
|
@ -15,6 +15,6 @@ public class ShellProcessor extends AbstractScriptProcessor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getRunCommand() {
|
protected String getRunCommand() {
|
||||||
return "/bin/sh";
|
return SH_SHELL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,14 @@ import tech.powerjob.official.processors.TestUtils;
|
|||||||
*/
|
*/
|
||||||
class HttpProcessorTest {
|
class HttpProcessorTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDefaultMethod() throws Exception {
|
||||||
|
String url = "https://www.baidu.com";
|
||||||
|
JSONObject params = new JSONObject();
|
||||||
|
params.put("url", url);
|
||||||
|
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testGet() throws Exception {
|
void testGet() throws Exception {
|
||||||
String url = "https://www.baidu.com";
|
String url = "https://www.baidu.com";
|
||||||
@ -34,6 +42,25 @@ class HttpProcessorTest {
|
|||||||
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPostDefaultJson() throws Exception {
|
||||||
|
String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true";
|
||||||
|
JSONObject params = new JSONObject();
|
||||||
|
params.put("url", url);
|
||||||
|
params.put("method", "POST");
|
||||||
|
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPostDefaultWithMediaType() throws Exception {
|
||||||
|
String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true";
|
||||||
|
JSONObject params = new JSONObject();
|
||||||
|
params.put("url", url);
|
||||||
|
params.put("method", "POST");
|
||||||
|
params.put("mediaType", "application/json");
|
||||||
|
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testTimeout() throws Exception {
|
void testTimeout() throws Exception {
|
||||||
String url = "http://localhost:7700/tmp/sleep";
|
String url = "http://localhost:7700/tmp/sleep";
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<groupId>com.github.kfcfans</groupId>
|
<groupId>com.github.kfcfans</groupId>
|
||||||
<version>3.4.6</version>
|
<version>4.0.0</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
@ -120,7 +120,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
|
|||||||
|
|
||||||
ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(workerRuntime)
|
ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(workerRuntime)
|
||||||
.withDispatcher("akka.task-tracker-dispatcher")
|
.withDispatcher("akka.task-tracker-dispatcher")
|
||||||
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
actorSystem.actorOf(ProcessorTrackerActor.props(workerRuntime)
|
actorSystem.actorOf(ProcessorTrackerActor.props(workerRuntime)
|
||||||
.withDispatcher("akka.processor-tracker-dispatcher")
|
.withDispatcher("akka.processor-tracker-dispatcher")
|
||||||
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||||
|
@ -53,7 +53,8 @@ public abstract class MapProcessor implements BasicProcessor {
|
|||||||
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
|
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
|
||||||
|
|
||||||
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
|
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
|
||||||
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
boolean requestSucceed = AkkaUtils.reliableTransmit(workerRuntime.getActorSystem().actorSelection(akkaRemotePath), req);
|
boolean requestSucceed = AkkaUtils.reliableTransmit(workerRuntime.getActorSystem().actorSelection(akkaRemotePath), req);
|
||||||
|
|
||||||
if (requestSucceed) {
|
if (requestSucceed) {
|
||||||
|
@ -108,7 +108,8 @@ public class ProcessorTracker {
|
|||||||
this.instanceInfo = request.getInstanceInfo();
|
this.instanceInfo = request.getInstanceInfo();
|
||||||
this.instanceId = request.getInstanceInfo().getInstanceId();
|
this.instanceId = request.getInstanceInfo().getInstanceId();
|
||||||
this.taskTrackerAddress = request.getTaskTrackerAddress();
|
this.taskTrackerAddress = request.getTaskTrackerAddress();
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
|
||||||
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
|
this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
|
||||||
|
|
||||||
this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler());
|
this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler());
|
||||||
|
@ -37,7 +37,7 @@ public class CommonTaskTrackerTest {
|
|||||||
worker.init();
|
worker.init();
|
||||||
|
|
||||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ public class CommonTest {
|
|||||||
String address = NetUtils.getLocalHost() + ":27777";
|
String address = NetUtils.getLocalHost() + ":27777";
|
||||||
|
|
||||||
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
|
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
|
||||||
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME));
|
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.TASK_TRACKER_ACTOR_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
|
@ -35,7 +35,7 @@ public class FrequentTaskTrackerTest {
|
|||||||
worker.init();
|
worker.init();
|
||||||
|
|
||||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||||
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user