From 3167f01cf71790ccf3913d3dbb7ae3ed80e7594f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctjq=E2=80=9D?= Date: Sun, 18 Oct 2020 00:21:18 +0800 Subject: [PATCH] feat: FrequentTaskTracker auto set down worker's task status to failed --- .../core/processor/built/ScriptProcessor.java | 4 ++-- .../worker/core/tracker/task/CommonTaskTracker.java | 5 ++--- .../core/tracker/task/FrequentTaskTracker.java | 11 +++++++++++ .../worker/persistence/TaskPersistenceService.java | 13 ++++++++++--- .../kfcfans/powerjob/PersistenceServiceTest.java | 2 +- 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java index 2780f86c..9b84bb5f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java @@ -49,7 +49,7 @@ public abstract class ScriptProcessor implements BasicProcessor { throw new RuntimeException("create script file failed"); } - // 如果是下载连接,则从网络获取 + // 如果是下载链接,则从网络获取 for (String protocol : DOWNLOAD_PROTOCOL) { if (processorInfo.startsWith(protocol)) { FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); @@ -57,7 +57,7 @@ public abstract class ScriptProcessor implements BasicProcessor { } } - // 持久化到本地 + // 非下载链接,为 processInfo 生成可执行文件 try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { bw.write(processorInfo); bw.flush(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 91188af4..513b8722 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -290,10 +290,9 @@ public class CommonTaskTracker extends TaskTracker { List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); if (!disconnectedPTs.isEmpty()) { log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); - boolean updateLostTasks = taskPersistenceService.updateLostTasks(disconnectedPTs); - if (updateLostTasks) { + if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) { ptStatusHolder.remove(disconnectedPTs); - log.warn("[TaskTracker-{}] detective some worker is offline: {}", instanceId, disconnectedPTs); + log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); } } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 00903371..9fa9b34b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -228,6 +228,17 @@ public class FrequentTaskTracker extends TaskTracker { private void checkStatus() { Stopwatch stopwatch = Stopwatch.createStarted(); + + // worker 挂掉的任务直接置为失败 + List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); + if (!disconnectedPTs.isEmpty()) { + log.warn("[FQTaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); + if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, false)) { + ptStatusHolder.remove(disconnectedPTs); + log.warn("[FQTaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); + } + } + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS(); long nowTS = System.currentTimeMillis(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java index b967af3e..e5f841d3 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java @@ -100,17 +100,24 @@ public class TaskPersistenceService { * set address = 'N/A', status = 0 * where address in () and status not in (5,6) */ - public boolean updateLostTasks(List addressList) { + public boolean updateLostTasks(Long instanceId, List addressList, boolean retry) { TaskDO updateEntity = new TaskDO(); - updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); - updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); updateEntity.setLastModifiedTime(System.currentTimeMillis()); + if (retry) { + updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); + updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + }else { + updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()); + updateEntity.setResult("maybe worker down"); + } SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); String queryConditionFormat = "address in %s and status not in (%d, %d)"; String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue()); query.setQueryCondition(queryCondition); + log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition()); try { return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java index 8f0482fb..368947c1 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java @@ -73,7 +73,7 @@ public class PersistenceServiceTest { @Test public void testUpdateLostTasks() throws Exception { Thread.sleep(1000); - boolean success = taskPersistenceService.updateLostTasks(Lists.newArrayList(NetUtils.getLocalHost())); + boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost()), true); System.out.println("updateLostTasks: " + success); }