diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/CommonUtils.java index 498d5b73..19dd3fe5 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/CommonUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/CommonUtils.java @@ -92,14 +92,16 @@ public class CommonUtils { public static void executeIgnoreException(SupplierPlus executor) { try { executor.get(); - }catch (Exception ignore) { + }catch (Exception e) { + log.warn("executeIgnoreException but exception!", e); } } public static void executeIgnoreException(Meaningless executor) { try { executor.m(); - }catch (Exception ignore) { + }catch (Exception e) { + log.warn("executeIgnoreException but exception!", e); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 5ae98814..cc55e500 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -34,6 +34,8 @@ import tech.powerjob.worker.core.tracker.task.stat.ExternalTaskStatistics; import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics; import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.persistence.db.TaskPersistenceService; +import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; +import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; @@ -62,6 +64,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { * 数据库持久化服务 */ protected final TaskPersistenceService taskPersistenceService; + protected ExternalTaskPersistenceService externalTaskPersistenceService; /** * 定时任务线程池 */ @@ -308,14 +311,13 @@ public abstract class HeavyTaskTracker extends TaskTracker { long totalCommittedNum = committedTaskStatistics.getTotalCommittedNum(); log.debug("[TaskTracker-{}] current committed num: {}, receive new tasks: {}", instanceId, totalCommittedNum, newTaskList); - boolean saveResult = true; + boolean saveResult; if (totalCommittedNum > maxActiveTaskNum) { - log.info("[TaskTracker-{}] totalCommittedNum({}) > maxActiveTaskNum({}), persist task to disk", instanceId, totalCommittedNum, maxActiveTaskNum); + saveResult = getExternalTaskPersistenceService().persistPendingTask(newTaskList); } else { saveResult = taskPersistenceService.batchSave(newTaskList); } - if (saveResult) { committedTaskStatistics.getSucceedNum().add(newTaskList.size()); } else { @@ -574,6 +576,14 @@ public abstract class HeavyTaskTracker extends TaskTracker { } } + protected class YuGong extends SafeRunnable { + + @Override + protected void run0() { + + } + } + @Data @AllArgsConstructor protected static class TaskBriefInfo { @@ -591,4 +601,21 @@ public abstract class HeavyTaskTracker extends TaskTracker { * @param req 服务器调度任务实例运行请求 */ protected abstract void initTaskTracker(ServerScheduleJobReq req); + + protected ExternalTaskPersistenceService getExternalTaskPersistenceService() { + if (externalTaskPersistenceService != null) { + return externalTaskPersistenceService; + } + synchronized (this) { + if (externalTaskPersistenceService != null) { + return externalTaskPersistenceService; + } + + boolean needResult = ExecuteType.MAP_REDUCE.equals(executeType); + + log.info("[TaskTracker-{}] totalCommittedNum({}) > maxActiveTaskNum({}), start to use ExternalTaskPersistenceService", instanceId, committedTaskStatistics.getTotalCommittedNum(), maxActiveTaskNum); + externalTaskPersistenceService = new ExternalTaskFileSystemPersistenceService(instanceId, needResult); + return externalTaskPersistenceService; + } + } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java index 4b150b37..3ba559f4 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java @@ -2,6 +2,7 @@ package tech.powerjob.worker.persistence.fs; import tech.powerjob.worker.persistence.db.TaskDO; +import java.io.Closeable; import java.util.List; /** @@ -10,7 +11,7 @@ import java.util.List; * @author tjq * @since 2024/2/22 */ -public interface ExternalTaskPersistenceService extends AutoCloseable { +public interface ExternalTaskPersistenceService extends Closeable { boolean persistPendingTask(List tasks); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java index 666b7767..83af20b1 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java @@ -1,5 +1,6 @@ package tech.powerjob.worker.persistence.fs; +import java.io.Closeable; import java.io.IOException; /** @@ -8,7 +9,7 @@ import java.io.IOException; * @author tjq * @since 2024/2/22 */ -public interface FsService extends AutoCloseable { +public interface FsService extends Closeable { void writeLine(String content) throws IOException; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java index 79086591..e0ffac47 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java @@ -9,6 +9,7 @@ import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; import tech.powerjob.worker.persistence.fs.FsService; +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -21,21 +22,35 @@ import java.util.List; @Slf4j public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService { private final Long instanceId; - private final Long subInstanceId; private final FsService pendingFsService; private final FsService resultFsService; - private static final String PENDING_FILE_NAME = "%d_%d-pending"; - private static final String RESULT_FILE_NAME = "%d_%d-result"; + private static final String PENDING_FILE_NAME = "%d-pending"; + private static final String RESULT_FILE_NAME = "%d-result"; - public ExternalTaskFileSystemPersistenceService(Long instanceId, Long subInstanceId) { + public ExternalTaskFileSystemPersistenceService(Long instanceId, boolean needResult) { this.instanceId = instanceId; - this.subInstanceId = subInstanceId; - this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId, subInstanceId)); - this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId, subInstanceId)); + this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId)); + if (needResult) { + this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId)); + } else { + this.resultFsService = new FsService() { + @Override + public void writeLine(String content) throws IOException { + } + + @Override + public String readLine() throws IOException { + return null; + } + @Override + public void close() { + } + }; + } } @Override @@ -45,7 +60,7 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer pendingFsService.writeLine(content); return true; } catch (Exception e) { - log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks); + log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks); } return false; } @@ -68,7 +83,7 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer resultFsService.writeLine(content); return true; } catch (Exception e) { - log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks); + log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks); } return false; } @@ -85,7 +100,7 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer } @Override - public void close() throws Exception { + public void close() { CommonUtils.executeIgnoreException(() -> { if (pendingFsService != null) { pendingFsService.close(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java index 1ad6d0ed..b267646f 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java @@ -22,7 +22,7 @@ public class LocalDiskFsService implements FsService { private static final String FILE_NAME_PATTERN = "%s.powerjob"; - + private final File file; private final BufferedWriter bufferedWriter; private final BufferedReader bufferedReader; @@ -32,7 +32,7 @@ public class LocalDiskFsService implements FsService { String fileName = String.format(FILE_NAME_PATTERN, keyword); String filePath = WORKSPACE_PATH.concat(fileName); - File file = new File(filePath); + this.file = new File(filePath); FileUtils.createParentDirectories(file); // 在使用 BufferedReader 包装 FileReader 的情况下,不需要单独关闭 FileReader。当你调用 BufferedReader 的 close() 方法时,它会负责关闭它所包装的 FileReader。这是因为 BufferedReader.close() 方法内部会调用它所包装的流的 close() 方法,确保所有相关资源都被释放,包括底层的文件句柄 @@ -43,20 +43,30 @@ public class LocalDiskFsService implements FsService { log.info("[LocalDiskFsService] new LocalDiskFsService successfully, path: {}", filePath); } + /** + * 按行写数据,线程不安全,考虑到此处不用太在意性能,直接 synchronized + * @param content 内容 + * @throws IOException 异常 + */ @Override - public void writeLine(String content) throws IOException { + public synchronized void writeLine(String content) throws IOException { bufferedWriter.write(content); bufferedWriter.newLine(); bufferedWriter.flush(); } + /** + * 按行读数据,线程不安全,考虑到此处不用太在意性能,直接 synchronized + * @return 内容 + * @throws IOException 异常 + */ @Override - public String readLine() throws IOException { + public synchronized String readLine() throws IOException { return bufferedReader.readLine(); } @Override - public void close() throws Exception { + public void close() { CommonUtils.executeIgnoreException(() -> { if (bufferedWriter != null) { @@ -69,5 +79,10 @@ public class LocalDiskFsService implements FsService { bufferedReader.close(); } }); + + CommonUtils.executeIgnoreException(() -> { + boolean delete = file.delete(); + log.info("[LocalDiskFsService] delete file[{}] result: {}", file, delete); + }); } }