mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: FrequentTaskTracker auto set down worker's task status to failed
This commit is contained in:
parent
867b610e67
commit
3167f01cf7
@ -49,7 +49,7 @@ public abstract class ScriptProcessor implements BasicProcessor {
|
||||
throw new RuntimeException("create script file failed");
|
||||
}
|
||||
|
||||
// 如果是下载连接,则从网络获取
|
||||
// 如果是下载链接,则从网络获取
|
||||
for (String protocol : DOWNLOAD_PROTOCOL) {
|
||||
if (processorInfo.startsWith(protocol)) {
|
||||
FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
|
||||
@ -57,7 +57,7 @@ public abstract class ScriptProcessor implements BasicProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
// 持久化到本地
|
||||
// 非下载链接,为 processInfo 生成可执行文件
|
||||
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
|
||||
bw.write(processorInfo);
|
||||
bw.flush();
|
||||
|
@ -290,10 +290,9 @@ public class CommonTaskTracker extends TaskTracker {
|
||||
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
|
||||
if (!disconnectedPTs.isEmpty()) {
|
||||
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
|
||||
boolean updateLostTasks = taskPersistenceService.updateLostTasks(disconnectedPTs);
|
||||
if (updateLostTasks) {
|
||||
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
|
||||
ptStatusHolder.remove(disconnectedPTs);
|
||||
log.warn("[TaskTracker-{}] detective some worker is offline: {}", instanceId, disconnectedPTs);
|
||||
log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -228,6 +228,17 @@ public class FrequentTaskTracker extends TaskTracker {
|
||||
|
||||
private void checkStatus() {
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
|
||||
// worker 挂掉的任务直接置为失败
|
||||
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
|
||||
if (!disconnectedPTs.isEmpty()) {
|
||||
log.warn("[FQTaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
|
||||
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, false)) {
|
||||
ptStatusHolder.remove(disconnectedPTs);
|
||||
log.warn("[FQTaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
|
||||
}
|
||||
}
|
||||
|
||||
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
|
||||
long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS();
|
||||
long nowTS = System.currentTimeMillis();
|
||||
|
@ -100,17 +100,24 @@ public class TaskPersistenceService {
|
||||
* set address = 'N/A', status = 0
|
||||
* where address in () and status not in (5,6)
|
||||
*/
|
||||
public boolean updateLostTasks(List<String> addressList) {
|
||||
public boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry) {
|
||||
|
||||
TaskDO updateEntity = new TaskDO();
|
||||
updateEntity.setLastModifiedTime(System.currentTimeMillis());
|
||||
if (retry) {
|
||||
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
|
||||
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
updateEntity.setLastModifiedTime(System.currentTimeMillis());
|
||||
}else {
|
||||
updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||
updateEntity.setResult("maybe worker down");
|
||||
}
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
String queryConditionFormat = "address in %s and status not in (%d, %d)";
|
||||
String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue());
|
||||
query.setQueryCondition(queryCondition);
|
||||
log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition());
|
||||
|
||||
try {
|
||||
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));
|
||||
|
@ -73,7 +73,7 @@ public class PersistenceServiceTest {
|
||||
@Test
|
||||
public void testUpdateLostTasks() throws Exception {
|
||||
Thread.sleep(1000);
|
||||
boolean success = taskPersistenceService.updateLostTasks(Lists.newArrayList(NetUtils.getLocalHost()));
|
||||
boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost()), true);
|
||||
System.out.println("updateLostTasks: " + success);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user