diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index 5c8ab6af..50795566 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -20,6 +20,8 @@
3.4.2
5.6.1
5.0.0-RC5
+
+ 1.2.3
@@ -66,6 +68,13 @@
test
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+ test
+
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java
index 8c60f627..f3a45cc3 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java
@@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.common;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
+import com.google.common.collect.Lists;
import lombok.Data;
+import java.util.Collections;
import java.util.List;
/**
@@ -26,7 +28,7 @@ public class OhMyConfig {
/**
* 调度服务器地址,ip:port 或 域名
*/
- private List serverAddress;
+ private List serverAddress = Lists.newArrayList();
/**
* 本地持久化方式,默认使用磁盘
*/
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
index d0df40bf..78594cfc 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
@@ -54,6 +54,8 @@ public class ProcessorTracker {
private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
private Queue statusReportRetryQueue;
+ // 上一次空闲时间
+ private long lastIdleTime;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
@@ -62,6 +64,8 @@ public class ProcessorTracker {
private ScheduledExecutorService timingPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
+ // 最多,长时间空闲的 ProcessorTracker 会发起销毁请求
+ private static final long MAX_IDLE_TIME = 120000;
// 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
private boolean lethal = false;
@@ -82,6 +86,7 @@ public class ProcessorTracker {
this.omsLogger = new OmsServerLogger(instanceId);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
+ this.lastIdleTime = -1L;
// 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE
initThreadPool();
@@ -221,6 +226,7 @@ public class ProcessorTracker {
@Override
public void run() {
+ // 超时检查,如果超时则自动关闭 TaskTracker
long interval = System.currentTimeMillis() - startTime;
// 秒级任务的ProcessorTracker不应该关闭
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
@@ -243,10 +249,27 @@ public class ProcessorTracker {
}
}
+ // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查
+ if (threadPool.getActiveCount() > 0) {
+ lastIdleTime = -1;
+ }else {
+ if (lastIdleTime == -1) {
+ lastIdleTime = System.currentTimeMillis();
+ }else {
+ long idleTime = System.currentTimeMillis() - lastIdleTime;
+ if (idleTime > MAX_IDLE_TIME) {
+ lastIdleTime = System.currentTimeMillis();
+ log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker.", instanceId, idleTime);
+
+ taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null);
+ return;
+ }
+ }
+ }
+
// 上报当前 ProcessorTracker 负载
long waitingNum = threadPool.getQueue().size();
- ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
- taskTrackerActorRef.tell(req, null);
+ taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum), null);
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java
index cfb8b907..59799184 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java
@@ -267,8 +267,17 @@ public abstract class TaskTracker {
* @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
- ptStatusHolder.updateStatus(heartbeatReq);
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
+ ptStatusHolder.updateStatus(heartbeatReq);
+
+ // 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
+ if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
+ List unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, heartbeatReq.getAddress());
+ if (!CollectionUtils.isEmpty(unfinishedTask)) {
+ log.warn("[TaskTracker-{}] ProcessorTracker is idle now but have unfinished tasks: {}", instanceId, unfinishedTask);
+ unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
+ }
+ }
}
/**
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java
index 0a26f996..a941c34f 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/ConnectionFactory.java
@@ -33,7 +33,8 @@ public class ConnectionFactory {
synchronized (ConnectionFactory.class) {
if (dataSource == null) {
- StoreStrategy strategy = OhMyWorker.getConfig().getStoreStrategy();
+ // 兼容单元测试,否则没办法单独测试 DAO 层了
+ StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy();
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.h2.Driver");
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java
index 840b7088..ef5041d6 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java
@@ -66,9 +66,8 @@ public class TaskDO {
@Override
public String toString() {
- return "TaskDO{" +
+ return "{" +
"taskId='" + taskId + '\'' +
- ", instanceId='" + instanceId + '\'' +
", taskName='" + taskName + '\'' +
", address='" + address + '\'' +
", status=" + status +
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java
index 7788adeb..0180b9ca 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java
@@ -156,6 +156,23 @@ public class TaskPersistenceService {
return Lists.newArrayList();
}
+ // 获取某个 ProcessorTracker 未完成的任务
+ public List getAllUnFinishedTaskByAddress(Long instanceId, String address) {
+ try {
+ String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue());
+
+ SimpleTaskQuery query = new SimpleTaskQuery();
+ query.setInstanceId(instanceId);
+ query.setAddress(address);
+ query.setQueryCondition(condition);
+
+ return execute(() -> taskDAO.simpleQuery(query));
+ }catch (Exception e) {
+ log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
+ }
+ return Lists.newArrayList();
+ }
+
/**
* 获取指定状态的Task
*/
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java
index 8a924958..37941078 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorTrackerStatusReportReq.java
@@ -16,6 +16,12 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class ProcessorTrackerStatusReportReq implements OmsSerializable {
+ public static final int IDLE = 1;
+ public static final int LOAD = 2;
+
+ // IDLE 代表 ProcessorTracker 长期处于空闲状态,LOAD 代表 负载上报请求
+ private int type;
+
private Long instanceId;
/**
@@ -33,11 +39,24 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable {
*/
private String address;
- public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) {
- this.instanceId = instanceId;
- this.remainTaskNum = remainTaskNum;
- this.time = System.currentTimeMillis();
- this.address = OhMyWorker.getWorkerAddress();
+ public static ProcessorTrackerStatusReportReq buildIdleReport(Long instanceId) {
+ ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
+ req.type = IDLE;
+ req.instanceId = instanceId;
+ req.time = System.currentTimeMillis();
+ req.address = OhMyWorker.getWorkerAddress();
+ req.setRemainTaskNum(0);
+ return req;
+ }
+
+ public static ProcessorTrackerStatusReportReq buildLoadReport(Long instanceId, Long remainTaskNum) {
+ ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
+ req.type = LOAD;
+ req.instanceId = instanceId;
+ req.time = System.currentTimeMillis();
+ req.address = OhMyWorker.getWorkerAddress();
+ req.setRemainTaskNum(remainTaskNum);
+ return req;
}
}
diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java
new file mode 100644
index 00000000..ba884113
--- /dev/null
+++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java
@@ -0,0 +1,78 @@
+package com.github.kfcfans.powerjob;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import com.github.kfcfans.powerjob.common.ExecuteType;
+import com.github.kfcfans.powerjob.common.ProcessorType;
+import com.github.kfcfans.powerjob.common.RemoteConstant;
+import com.github.kfcfans.powerjob.common.utils.NetUtils;
+import com.github.kfcfans.powerjob.worker.OhMyWorker;
+import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
+import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
+import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
+import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
+import com.typesafe.config.ConfigFactory;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+/**
+ * 启动公共服务
+ *
+ * @author tjq
+ * @since 2020/6/17
+ */
+public class CommonTest {
+
+ protected static ActorSelection remoteProcessorTracker;
+ protected static ActorSelection remoteTaskTracker;
+
+ @BeforeAll
+ public static void startWorker() throws Exception {
+ OhMyConfig ohMyConfig = new OhMyConfig();
+ ohMyConfig.setAppName("oms-test");
+ ohMyConfig.setEnableTestMode(true);
+
+ OhMyWorker worker = new OhMyWorker();
+ worker.setConfig(ohMyConfig);
+ worker.init();
+
+ ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
+ String address = NetUtils.getLocalHost() + ":27777";
+
+ remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
+ remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME));
+ }
+
+ @AfterAll
+ public static void stop() throws Exception {
+ Thread.sleep(120000);
+ }
+
+ public static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
+
+ InstanceInfo instanceInfo = new InstanceInfo();
+
+ instanceInfo.setJobId(1L);
+ instanceInfo.setInstanceId(10086L);
+
+ instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
+ instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
+ instanceInfo.setProcessorInfo(processor);
+
+ instanceInfo.setInstanceTimeoutMS(500000);
+
+ instanceInfo.setThreadConcurrency(5);
+ instanceInfo.setTaskRetryNum(3);
+
+ TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
+
+ req.setTaskTrackerAddress(NetUtils.getLocalHost() + ":27777");
+ req.setInstanceInfo(instanceInfo);
+
+ req.setTaskId("0");
+ req.setTaskName("ROOT_TASK");
+ req.setTaskCurrentRetryNums(0);
+
+ return req;
+ }
+}
diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java
index a35aeddf..8f0482fb 100644
--- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java
+++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java
@@ -24,6 +24,7 @@ public class PersistenceServiceTest {
public static void initTable() throws Exception {
taskPersistenceService.init();
+ System.out.println("=============== init data ===============");
List taskList = Lists.newLinkedList();
for (int i = 0; i < 10; i++) {
TaskDO task = new TaskDO();
@@ -39,10 +40,11 @@ public class PersistenceServiceTest {
task.setAddress(NetUtils.getLocalHost());
task.setLastModifiedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
+ task.setLastReportTime(System.currentTimeMillis());
+ task.setResult("");
}
taskPersistenceService.batchSave(taskList);
- System.out.println("=============== init data ===============");
taskList.forEach(System.out::println);
}
@@ -75,4 +77,11 @@ public class PersistenceServiceTest {
System.out.println("updateLostTasks: " + success);
}
+ @Test
+ public void testGetAllUnFinishedTaskByAddress() throws Exception {
+ System.out.println("=============== testGetAllUnFinishedTaskByAddress ===============");
+ List res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost());
+ System.out.println(res);
+ }
+
}
diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ProcessorTrackerTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ProcessorTrackerTest.java
index 5d57643d..8c3d6a1d 100644
--- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ProcessorTrackerTest.java
+++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/ProcessorTrackerTest.java
@@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
+import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.typesafe.config.ConfigFactory;
@@ -23,68 +24,20 @@ import org.junit.jupiter.api.Test;
* @author tjq
* @since 2020/3/24
*/
-public class ProcessorTrackerTest {
-
- private static ActorSelection remoteProcessorTracker;
-
- @BeforeAll
- public static void startWorker() throws Exception {
- OhMyConfig ohMyConfig = new OhMyConfig();
- ohMyConfig.setAppName("oms-test");
- OhMyWorker worker = new OhMyWorker();
- worker.setConfig(ohMyConfig);
- worker.init();
-
- ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
- String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
- remoteProcessorTracker = testAS.actorSelection(akkaRemotePath);
- }
-
- @AfterAll
- public static void stop() throws Exception {
- Thread.sleep(120000);
- }
+public class ProcessorTrackerTest extends CommonTest {
@Test
public void testBasicProcessor() throws Exception {
- TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestBasicProcessor");
+ TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
remoteProcessorTracker.tell(req, null);
Thread.sleep(30000);
}
@Test
public void testMapReduceProcessor() throws Exception {
- TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
+ TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
remoteProcessorTracker.tell(req, null);
Thread.sleep(30000);
}
-
- private static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
-
- InstanceInfo instanceInfo = new InstanceInfo();
-
- instanceInfo.setJobId(1L);
- instanceInfo.setInstanceId(10086L);
-
- instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
- instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
- instanceInfo.setProcessorInfo(processor);
-
- instanceInfo.setInstanceTimeoutMS(500000);
-
- instanceInfo.setThreadConcurrency(5);
- instanceInfo.setTaskRetryNum(3);
-
- TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
-
- req.setTaskTrackerAddress(NetUtils.getLocalHost());
- req.setInstanceInfo(instanceInfo);
-
- req.setTaskId("0");
- req.setTaskName("ROOT_TASK");
- req.setTaskCurrentRetryNums(0);
-
- return req;
- }
}
diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/TestUtils.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/TestUtils.java
index 427edd4f..c55ce57f 100644
--- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/TestUtils.java
+++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/TestUtils.java
@@ -19,6 +19,7 @@ public class TestUtils {
public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
+ req.setJobId(1L);
req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
@@ -38,15 +39,15 @@ public class TestUtils {
switch (executeType) {
case STANDALONE:
req.setExecuteType(ExecuteType.STANDALONE.name());
- req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
+ req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
break;
case MAP_REDUCE:
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
- req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
+ req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
break;
case BROADCAST:
req.setExecuteType(ExecuteType.BROADCAST.name());
- req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
+ req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBroadcastProcessor");
break;
}
diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java
new file mode 100644
index 00000000..0a09a4df
--- /dev/null
+++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/function/IdleTest.java
@@ -0,0 +1,40 @@
+package com.github.kfcfans.powerjob.function;
+
+import com.github.kfcfans.powerjob.CommonTest;
+import com.github.kfcfans.powerjob.TestUtils;
+import com.github.kfcfans.powerjob.common.ExecuteType;
+import com.github.kfcfans.powerjob.common.TimeExpressionType;
+import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
+import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
+import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
+import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
+import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
+import org.junit.jupiter.api.Test;
+
+/**
+ * 空闲测试
+ *
+ * @author tjq
+ * @since 2020/6/17
+ */
+public class IdleTest extends CommonTest {
+
+ @Test
+ public void testProcessorTrackerSendIdleReport() throws Exception {
+ TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
+ ProcessorTracker pt = new ProcessorTracker(req);
+ Thread.sleep(300000);
+ }
+
+ @Test
+ public void testTaskTrackerProcessorIdle() throws Exception {
+
+ ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L);
+ ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API);
+
+ TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq);
+ if (taskTracker != null) {
+ taskTracker.receiveProcessorTrackerHeartbeat(req);
+ }
+ }
+}
diff --git a/powerjob-worker/src/test/resources/oms-akka-test.conf b/powerjob-worker/src/test/resources/oms-akka-test.conf
index 7615d41a..ec3f1ee2 100644
--- a/powerjob-worker/src/test/resources/oms-akka-test.conf
+++ b/powerjob-worker/src/test/resources/oms-akka-test.conf
@@ -5,7 +5,7 @@ akka {
allow-java-serialization = off
serialization-bindings {
- "OmsSerializable" = jackson-cbor
+ "com.github.kfcfans.powerjob.common.OmsSerializable" = jackson-cbor
}
}
remote {