mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[dev] add idle detective mechanism
This commit is contained in:
parent
c16a2007c5
commit
ecb5a5bd1b
@ -20,6 +20,8 @@
|
||||
<hikaricp.version>3.4.2</hikaricp.version>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
<kryo.version>5.0.0-RC5</kryo.version>
|
||||
|
||||
<logback.version>1.2.3</logback.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -66,6 +68,13 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- log for test stage -->
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>${logback.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.common;
|
||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
|
||||
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -26,7 +28,7 @@ public class OhMyConfig {
|
||||
/**
|
||||
* 调度服务器地址,ip:port 或 域名
|
||||
*/
|
||||
private List<String> serverAddress;
|
||||
private List<String> serverAddress = Lists.newArrayList();
|
||||
/**
|
||||
* 本地持久化方式,默认使用磁盘
|
||||
*/
|
||||
|
@ -54,6 +54,8 @@ public class ProcessorTracker {
|
||||
private OmsLogger omsLogger;
|
||||
// ProcessResult 上报失败的重试队列
|
||||
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
|
||||
// 上一次空闲时间
|
||||
private long lastIdleTime;
|
||||
|
||||
private String taskTrackerAddress;
|
||||
private ActorSelection taskTrackerActorRef;
|
||||
@ -62,6 +64,8 @@ public class ProcessorTracker {
|
||||
private ScheduledExecutorService timingPool;
|
||||
|
||||
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
|
||||
// 最多,长时间空闲的 ProcessorTracker 会发起销毁请求
|
||||
private static final long MAX_IDLE_TIME = 120000;
|
||||
|
||||
// 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
|
||||
private boolean lethal = false;
|
||||
@ -82,6 +86,7 @@ public class ProcessorTracker {
|
||||
|
||||
this.omsLogger = new OmsServerLogger(instanceId);
|
||||
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
|
||||
this.lastIdleTime = -1L;
|
||||
|
||||
// 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE
|
||||
initThreadPool();
|
||||
@ -221,6 +226,7 @@ public class ProcessorTracker {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// 超时检查,如果超时则自动关闭 TaskTracker
|
||||
long interval = System.currentTimeMillis() - startTime;
|
||||
// 秒级任务的ProcessorTracker不应该关闭
|
||||
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
|
||||
@ -243,10 +249,27 @@ public class ProcessorTracker {
|
||||
}
|
||||
}
|
||||
|
||||
// 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查
|
||||
if (threadPool.getActiveCount() > 0) {
|
||||
lastIdleTime = -1;
|
||||
}else {
|
||||
if (lastIdleTime == -1) {
|
||||
lastIdleTime = System.currentTimeMillis();
|
||||
}else {
|
||||
long idleTime = System.currentTimeMillis() - lastIdleTime;
|
||||
if (idleTime > MAX_IDLE_TIME) {
|
||||
lastIdleTime = System.currentTimeMillis();
|
||||
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker.", instanceId, idleTime);
|
||||
|
||||
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 上报当前 ProcessorTracker 负载
|
||||
long waitingNum = threadPool.getQueue().size();
|
||||
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
|
||||
taskTrackerActorRef.tell(req, null);
|
||||
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum), null);
|
||||
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
|
||||
}
|
||||
|
||||
|
@ -267,8 +267,17 @@ public abstract class TaskTracker {
|
||||
* @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态
|
||||
*/
|
||||
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
|
||||
ptStatusHolder.updateStatus(heartbeatReq);
|
||||
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
|
||||
ptStatusHolder.updateStatus(heartbeatReq);
|
||||
|
||||
// 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
|
||||
if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
|
||||
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, heartbeatReq.getAddress());
|
||||
if (!CollectionUtils.isEmpty(unfinishedTask)) {
|
||||
log.warn("[TaskTracker-{}] ProcessorTracker is idle now but have unfinished tasks: {}", instanceId, unfinishedTask);
|
||||
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -33,7 +33,8 @@ public class ConnectionFactory {
|
||||
synchronized (ConnectionFactory.class) {
|
||||
if (dataSource == null) {
|
||||
|
||||
StoreStrategy strategy = OhMyWorker.getConfig().getStoreStrategy();
|
||||
// 兼容单元测试,否则没办法单独测试 DAO 层了
|
||||
StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy();
|
||||
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setDriverClassName("org.h2.Driver");
|
||||
|
@ -66,9 +66,8 @@ public class TaskDO {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TaskDO{" +
|
||||
return "{" +
|
||||
"taskId='" + taskId + '\'' +
|
||||
", instanceId='" + instanceId + '\'' +
|
||||
", taskName='" + taskName + '\'' +
|
||||
", address='" + address + '\'' +
|
||||
", status=" + status +
|
||||
|
@ -156,6 +156,23 @@ public class TaskPersistenceService {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
// 获取某个 ProcessorTracker 未完成的任务
|
||||
public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
|
||||
try {
|
||||
String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setAddress(address);
|
||||
query.setQueryCondition(condition);
|
||||
|
||||
return execute(() -> taskDAO.simpleQuery(query));
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定状态的Task
|
||||
*/
|
||||
|
@ -16,6 +16,12 @@ import lombok.NoArgsConstructor;
|
||||
@NoArgsConstructor
|
||||
public class ProcessorTrackerStatusReportReq implements OmsSerializable {
|
||||
|
||||
public static final int IDLE = 1;
|
||||
public static final int LOAD = 2;
|
||||
|
||||
// IDLE 代表 ProcessorTracker 长期处于空闲状态,LOAD 代表 负载上报请求
|
||||
private int type;
|
||||
|
||||
private Long instanceId;
|
||||
|
||||
/**
|
||||
@ -33,11 +39,24 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable {
|
||||
*/
|
||||
private String address;
|
||||
|
||||
public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) {
|
||||
this.instanceId = instanceId;
|
||||
this.remainTaskNum = remainTaskNum;
|
||||
|
||||
this.time = System.currentTimeMillis();
|
||||
this.address = OhMyWorker.getWorkerAddress();
|
||||
public static ProcessorTrackerStatusReportReq buildIdleReport(Long instanceId) {
|
||||
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
|
||||
req.type = IDLE;
|
||||
req.instanceId = instanceId;
|
||||
req.time = System.currentTimeMillis();
|
||||
req.address = OhMyWorker.getWorkerAddress();
|
||||
req.setRemainTaskNum(0);
|
||||
return req;
|
||||
}
|
||||
|
||||
public static ProcessorTrackerStatusReportReq buildLoadReport(Long instanceId, Long remainTaskNum) {
|
||||
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
|
||||
req.type = LOAD;
|
||||
req.instanceId = instanceId;
|
||||
req.time = System.currentTimeMillis();
|
||||
req.address = OhMyWorker.getWorkerAddress();
|
||||
req.setRemainTaskNum(remainTaskNum);
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,78 @@
|
||||
package com.github.kfcfans.powerjob;
|
||||
|
||||
import akka.actor.ActorSelection;
|
||||
import akka.actor.ActorSystem;
|
||||
import com.github.kfcfans.powerjob.common.ExecuteType;
|
||||
import com.github.kfcfans.powerjob.common.ProcessorType;
|
||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.worker.OhMyWorker;
|
||||
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
|
||||
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
/**
|
||||
* 启动公共服务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/6/17
|
||||
*/
|
||||
public class CommonTest {
|
||||
|
||||
protected static ActorSelection remoteProcessorTracker;
|
||||
protected static ActorSelection remoteTaskTracker;
|
||||
|
||||
@BeforeAll
|
||||
public static void startWorker() throws Exception {
|
||||
OhMyConfig ohMyConfig = new OhMyConfig();
|
||||
ohMyConfig.setAppName("oms-test");
|
||||
ohMyConfig.setEnableTestMode(true);
|
||||
|
||||
OhMyWorker worker = new OhMyWorker();
|
||||
worker.setConfig(ohMyConfig);
|
||||
worker.init();
|
||||
|
||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||
String address = NetUtils.getLocalHost() + ":27777";
|
||||
|
||||
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
|
||||
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void stop() throws Exception {
|
||||
Thread.sleep(120000);
|
||||
}
|
||||
|
||||
public static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
|
||||
|
||||
InstanceInfo instanceInfo = new InstanceInfo();
|
||||
|
||||
instanceInfo.setJobId(1L);
|
||||
instanceInfo.setInstanceId(10086L);
|
||||
|
||||
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
|
||||
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
||||
instanceInfo.setProcessorInfo(processor);
|
||||
|
||||
instanceInfo.setInstanceTimeoutMS(500000);
|
||||
|
||||
instanceInfo.setThreadConcurrency(5);
|
||||
instanceInfo.setTaskRetryNum(3);
|
||||
|
||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
||||
|
||||
req.setTaskTrackerAddress(NetUtils.getLocalHost() + ":27777");
|
||||
req.setInstanceInfo(instanceInfo);
|
||||
|
||||
req.setTaskId("0");
|
||||
req.setTaskName("ROOT_TASK");
|
||||
req.setTaskCurrentRetryNums(0);
|
||||
|
||||
return req;
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ public class PersistenceServiceTest {
|
||||
public static void initTable() throws Exception {
|
||||
taskPersistenceService.init();
|
||||
|
||||
System.out.println("=============== init data ===============");
|
||||
List<TaskDO> taskList = Lists.newLinkedList();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
TaskDO task = new TaskDO();
|
||||
@ -39,10 +40,11 @@ public class PersistenceServiceTest {
|
||||
task.setAddress(NetUtils.getLocalHost());
|
||||
task.setLastModifiedTime(System.currentTimeMillis());
|
||||
task.setCreatedTime(System.currentTimeMillis());
|
||||
task.setLastReportTime(System.currentTimeMillis());
|
||||
task.setResult("");
|
||||
}
|
||||
|
||||
taskPersistenceService.batchSave(taskList);
|
||||
System.out.println("=============== init data ===============");
|
||||
taskList.forEach(System.out::println);
|
||||
}
|
||||
|
||||
@ -75,4 +77,11 @@ public class PersistenceServiceTest {
|
||||
System.out.println("updateLostTasks: " + success);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllUnFinishedTaskByAddress() throws Exception {
|
||||
System.out.println("=============== testGetAllUnFinishedTaskByAddress ===============");
|
||||
List<TaskDO> res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost());
|
||||
System.out.println(res);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
|
||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
|
||||
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
@ -23,68 +24,20 @@ import org.junit.jupiter.api.Test;
|
||||
* @author tjq
|
||||
* @since 2020/3/24
|
||||
*/
|
||||
public class ProcessorTrackerTest {
|
||||
|
||||
private static ActorSelection remoteProcessorTracker;
|
||||
|
||||
@BeforeAll
|
||||
public static void startWorker() throws Exception {
|
||||
OhMyConfig ohMyConfig = new OhMyConfig();
|
||||
ohMyConfig.setAppName("oms-test");
|
||||
OhMyWorker worker = new OhMyWorker();
|
||||
worker.setConfig(ohMyConfig);
|
||||
worker.init();
|
||||
|
||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||
remoteProcessorTracker = testAS.actorSelection(akkaRemotePath);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void stop() throws Exception {
|
||||
Thread.sleep(120000);
|
||||
}
|
||||
public class ProcessorTrackerTest extends CommonTest {
|
||||
|
||||
@Test
|
||||
public void testBasicProcessor() throws Exception {
|
||||
|
||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestBasicProcessor");
|
||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
|
||||
remoteProcessorTracker.tell(req, null);
|
||||
Thread.sleep(30000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduceProcessor() throws Exception {
|
||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
|
||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
|
||||
remoteProcessorTracker.tell(req, null);
|
||||
Thread.sleep(30000);
|
||||
}
|
||||
|
||||
private static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
|
||||
|
||||
InstanceInfo instanceInfo = new InstanceInfo();
|
||||
|
||||
instanceInfo.setJobId(1L);
|
||||
instanceInfo.setInstanceId(10086L);
|
||||
|
||||
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
|
||||
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
||||
instanceInfo.setProcessorInfo(processor);
|
||||
|
||||
instanceInfo.setInstanceTimeoutMS(500000);
|
||||
|
||||
instanceInfo.setThreadConcurrency(5);
|
||||
instanceInfo.setTaskRetryNum(3);
|
||||
|
||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
||||
|
||||
req.setTaskTrackerAddress(NetUtils.getLocalHost());
|
||||
req.setInstanceInfo(instanceInfo);
|
||||
|
||||
req.setTaskId("0");
|
||||
req.setTaskName("ROOT_TASK");
|
||||
req.setTaskCurrentRetryNums(0);
|
||||
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ public class TestUtils {
|
||||
public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) {
|
||||
ServerScheduleJobReq req = new ServerScheduleJobReq();
|
||||
|
||||
req.setJobId(1L);
|
||||
req.setInstanceId(10086L);
|
||||
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
|
||||
|
||||
@ -38,15 +39,15 @@ public class TestUtils {
|
||||
switch (executeType) {
|
||||
case STANDALONE:
|
||||
req.setExecuteType(ExecuteType.STANDALONE.name());
|
||||
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
|
||||
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
|
||||
break;
|
||||
case MAP_REDUCE:
|
||||
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
|
||||
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
|
||||
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
|
||||
break;
|
||||
case BROADCAST:
|
||||
req.setExecuteType(ExecuteType.BROADCAST.name());
|
||||
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
|
||||
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBroadcastProcessor");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,40 @@
|
||||
package com.github.kfcfans.powerjob.function;
|
||||
|
||||
import com.github.kfcfans.powerjob.CommonTest;
|
||||
import com.github.kfcfans.powerjob.TestUtils;
|
||||
import com.github.kfcfans.powerjob.common.ExecuteType;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
|
||||
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
|
||||
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* 空闲测试
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/6/17
|
||||
*/
|
||||
public class IdleTest extends CommonTest {
|
||||
|
||||
@Test
|
||||
public void testProcessorTrackerSendIdleReport() throws Exception {
|
||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
|
||||
ProcessorTracker pt = new ProcessorTracker(req);
|
||||
Thread.sleep(300000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskTrackerProcessorIdle() throws Exception {
|
||||
|
||||
ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L);
|
||||
ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API);
|
||||
|
||||
TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq);
|
||||
if (taskTracker != null) {
|
||||
taskTracker.receiveProcessorTrackerHeartbeat(req);
|
||||
}
|
||||
}
|
||||
}
|
@ -5,7 +5,7 @@ akka {
|
||||
allow-java-serialization = off
|
||||
|
||||
serialization-bindings {
|
||||
"OmsSerializable" = jackson-cbor
|
||||
"com.github.kfcfans.powerjob.common.OmsSerializable" = jackson-cbor
|
||||
}
|
||||
}
|
||||
remote {
|
||||
|
Loading…
x
Reference in New Issue
Block a user