From 53025d6bb185f70404e89f6e1fbabb2bc370063b Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 23 Feb 2024 00:11:54 +0800 Subject: [PATCH] feat: ExternalTaskPersistenceService --- .../tech/powerjob/common/PowerJobDKey.java | 6 ++ .../tech/powerjob/worker/PowerJobWorker.java | 2 +- .../worker/actors/ProcessorTrackerActor.java | 2 +- .../worker/actors/TaskTrackerActor.java | 2 +- .../worker/common/ThreadLocalStore.java | 2 +- .../powerjob/worker/common/WorkerRuntime.java | 2 +- .../runnable/HeavyProcessorRunnable.java | 2 +- .../core/processor/sdk/MapProcessor.java | 2 +- .../tracker/processor/ProcessorTracker.java | 2 +- .../tracker/task/heavy/CommonTaskTracker.java | 16 +-- .../task/heavy/FrequentTaskTracker.java | 14 ++- .../tracker/task/heavy/HeavyTaskTracker.java | 54 ++++++++-- .../task/stat/CommittedTaskStatistics.java | 33 ++++++ .../task/stat/ExternalTaskStatistics.java | 31 ++++++ ...older.java => InstanceTaskStatistics.java} | 2 +- .../{ => db}/ConnectionFactory.java | 2 +- .../persistence/{ => db}/SimpleTaskQuery.java | 2 +- .../worker/persistence/{ => db}/TaskDAO.java | 2 +- .../persistence/{ => db}/TaskDAOImpl.java | 3 +- .../worker/persistence/{ => db}/TaskDO.java | 2 +- .../{ => db}/TaskPersistenceService.java | 2 +- .../fs/ExternalTaskPersistenceService.java | 22 ++++ .../worker/persistence/fs/FsService.java | 16 +++ ...ernalTaskFileSystemPersistenceService.java | 101 ++++++++++++++++++ .../fs/impl/LocalDiskFsService.java | 73 +++++++++++++ .../pojo/request/ProcessorMapTaskRequest.java | 2 +- .../pojo/request/TaskTrackerStartTaskReq.java | 2 +- .../persistence/AbstractTaskDAOTest.java | 1 + .../persistence/TaskDAOPerformanceTest.java | 2 +- .../worker/persistence/TaskDAOTest.java | 3 +- .../worker/test/PersistenceServiceTest.java | 4 +- 31 files changed, 361 insertions(+), 50 deletions(-) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/CommittedTaskStatistics.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/ExternalTaskStatistics.java rename powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/{InstanceStatisticsHolder.java => InstanceTaskStatistics.java} (94%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/ConnectionFactory.java (98%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/SimpleTaskQuery.java (97%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/TaskDAO.java (96%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/TaskDAOImpl.java (99%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/TaskDO.java (98%) rename powerjob-worker/src/main/java/tech/powerjob/worker/persistence/{ => db}/TaskPersistenceService.java (99%) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index 92c3a353..10de773b 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -55,4 +55,10 @@ public class PowerJobDKey { */ public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval"; + /* 不太可能有人用的参数 */ + + /** + * 最大活跃任务数量,超出部分 SWAP 到磁盘以提升性能 + */ + public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num"; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index 404da4ca..d9890712 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -26,7 +26,7 @@ import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.utils.WorkerNetUtils; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.extension.processor.ProcessorFactory; -import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.persistence.db.TaskPersistenceService; import tech.powerjob.worker.processor.PowerJobProcessorLoader; import tech.powerjob.worker.processor.ProcessorLoader; import tech.powerjob.worker.processor.impl.BuiltInDefaultProcessorFactory; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java index 846cfa37..6292e1b5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java @@ -9,7 +9,7 @@ import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager; import tech.powerjob.worker.core.tracker.processor.ProcessorTracker; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java index 917f3abb..6d66bfe5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java @@ -18,7 +18,7 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker; import tech.powerjob.worker.core.tracker.task.light.LightTaskTracker; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/ThreadLocalStore.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/ThreadLocalStore.java index 3f6536ad..e8ea7074 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/ThreadLocalStore.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/ThreadLocalStore.java @@ -1,6 +1,6 @@ package tech.powerjob.worker.common; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import java.util.concurrent.atomic.AtomicLong; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java index 69d1af80..3e0ea722 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java @@ -6,7 +6,7 @@ import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.worker.background.OmsLogHandler; import tech.powerjob.worker.background.discovery.ServerDiscoveryService; import tech.powerjob.worker.core.executor.ExecutorManager; -import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.persistence.db.TaskPersistenceService; import tech.powerjob.worker.processor.ProcessorLoader; import java.util.Optional; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java index af28b72d..31207dbd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java @@ -21,7 +21,7 @@ import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; import tech.powerjob.worker.extension.processor.ProcessorBean; import tech.powerjob.worker.log.OmsLogger; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java index 1ec00177..1a3efa65 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -8,7 +8,7 @@ import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.utils.TransportUtils; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import java.util.List; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 3d7650cd..5a0652f7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -19,7 +19,7 @@ import tech.powerjob.worker.extension.processor.ProcessorBean; import tech.powerjob.worker.extension.processor.ProcessorDefinition; import tech.powerjob.worker.log.OmsLogger; import tech.powerjob.worker.log.OmsLoggerFactory; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index aa71ba35..2404551c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -18,8 +18,8 @@ import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; -import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics; +import tech.powerjob.worker.persistence.db.TaskDO; import java.util.List; import java.util.Optional; @@ -86,7 +86,7 @@ public class CommonTaskTracker extends HeavyTaskTracker { detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress()); // 填充详细信息 - InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); + InstanceTaskStatistics holder = getInstanceStatisticsHolder(instanceId); InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail(); taskDetail.setSucceedTaskNum(holder.getSucceedNum()); taskDetail.setFailedTaskNum(holder.getFailedNum()); @@ -105,18 +105,12 @@ public class CommonTaskTracker extends HeavyTaskTracker { private void persistenceRootTask() { TaskDO rootTask = new TaskDO(); - rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); - rootTask.setInstanceId(instanceInfo.getInstanceId()); rootTask.setTaskId(ROOT_TASK_ID); - rootTask.setFailedCnt(0); rootTask.setAddress(workerRuntime.getWorkerAddress()); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); - rootTask.setCreatedTime(System.currentTimeMillis()); - rootTask.setLastModifiedTime(System.currentTimeMillis()); - rootTask.setLastReportTime(-1L); rootTask.setSubInstanceId(instanceId); - if (taskPersistenceService.save(rootTask)) { + if (submitTask(Lists.newArrayList(rootTask))) { log.info("[TaskTracker-{}] create root task successfully.", instanceId); } else { log.error("[TaskTracker-{}] create root task failed.", instanceId); @@ -135,7 +129,7 @@ public class CommonTaskTracker extends HeavyTaskTracker { @SuppressWarnings("squid:S3776") private void innerRun() { - InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); + InstanceTaskStatistics holder = getInstanceStatisticsHolder(instanceId); long finishedNum = holder.getFinishedNum(); long unfinishedNum = holder.getUnfinishedNum(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java index 3b1a0bc5..c8345fb3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java @@ -22,8 +22,8 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.LRUCache; import tech.powerjob.worker.common.utils.TransportUtils; -import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics; +import tech.powerjob.worker.persistence.db.TaskDO; import java.util.*; import java.util.concurrent.Executors; @@ -184,13 +184,11 @@ public class FrequentTaskTracker extends HeavyTaskTracker { newRootTask.setTaskId(taskId); newRootTask.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); - newRootTask.setFailedCnt(0); + // 根任务总是默认本机执行 newRootTask.setAddress(myAddress); newRootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); - newRootTask.setCreatedTime(System.currentTimeMillis()); - newRootTask.setLastModifiedTime(System.currentTimeMillis()); - newRootTask.setLastReportTime(-1L); + // 判断是否超出最大执行实例数 if (maxInstanceNum > 0) { @@ -204,7 +202,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker { } // 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) - if (!taskPersistenceService.save(newRootTask)) { + if (!submitTask(Lists.newArrayList(newRootTask))) { log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId); processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); return; @@ -282,7 +280,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker { } // 查看执行情况 - InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); + InstanceTaskStatistics holder = getInstanceStatisticsHolder(subInstanceId); long finishedNum = holder.getFinishedNum(); long unfinishedNum = holder.getUnfinishedNum(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index dc8c49f3..5ae98814 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -9,6 +9,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.common.enums.ExecuteType; @@ -28,9 +29,11 @@ import tech.powerjob.worker.common.utils.WorkflowContextUtils; import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; -import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder; -import tech.powerjob.worker.persistence.TaskDO; -import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.core.tracker.task.stat.CommittedTaskStatistics; +import tech.powerjob.worker.core.tracker.task.stat.ExternalTaskStatistics; +import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics; +import tech.powerjob.worker.persistence.db.TaskDO; +import tech.powerjob.worker.persistence.db.TaskPersistenceService; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; @@ -68,6 +71,16 @@ public abstract class HeavyTaskTracker extends TaskTracker { */ private final Cache taskId2BriefInfo; + /** + * 任务统计相关 + */ + private final ExternalTaskStatistics externalTaskStatistics; + private final CommittedTaskStatistics committedTaskStatistics; + + /** + * 运行时最大任务数量,超出 SWAP 到磁盘 + */ + private final long maxActiveTaskNum; /** * 分段锁 @@ -87,6 +100,11 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 构建缓存 taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build(); + // 初始化统计参数 + this.externalTaskStatistics = new ExternalTaskStatistics(); + this.committedTaskStatistics = new CommittedTaskStatistics(); + this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, "100000")); + // 构建分段锁 segmentLock = new SegmentLock(UPDATE_CONCURRENCY); @@ -275,16 +293,36 @@ public abstract class HeavyTaskTracker extends TaskTracker { } // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!) newTaskList.forEach(task -> { + + // 秒级任务持久化的同时直接派发,有直接的状态 + if (task.getStatus() == null) { + task.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + } task.setInstanceId(instanceId); - task.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); task.setFailedCnt(0); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); task.setLastReportTime(-1L); }); - log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList); - return taskPersistenceService.batchSave(newTaskList); + long totalCommittedNum = committedTaskStatistics.getTotalCommittedNum(); + log.debug("[TaskTracker-{}] current committed num: {}, receive new tasks: {}", instanceId, totalCommittedNum, newTaskList); + + boolean saveResult = true; + if (totalCommittedNum > maxActiveTaskNum) { + log.info("[TaskTracker-{}] totalCommittedNum({}) > maxActiveTaskNum({}), persist task to disk", instanceId, totalCommittedNum, maxActiveTaskNum); + } else { + saveResult = taskPersistenceService.batchSave(newTaskList); + } + + + if (saveResult) { + committedTaskStatistics.getSucceedNum().add(newTaskList.size()); + } else { + committedTaskStatistics.getFailedNum().add(newTaskList.size()); + log.error("[TaskTracker-{}] batchSave new tasks failed, please check the log", instanceId); + } + return saveResult; } /** @@ -428,10 +466,10 @@ public abstract class HeavyTaskTracker extends TaskTracker { * @param subInstanceId 子任务实例ID * @return InstanceStatisticsHolder */ - protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) { + protected InstanceTaskStatistics getInstanceStatisticsHolder(long subInstanceId) { Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId); - InstanceStatisticsHolder holder = new InstanceStatisticsHolder(); + InstanceTaskStatistics holder = new InstanceTaskStatistics(); holder.setWaitingDispatchNum(status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L)); holder.setWorkerUnreceivedNum(status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L)); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/CommittedTaskStatistics.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/CommittedTaskStatistics.java new file mode 100644 index 00000000..b5eabe1b --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/CommittedTaskStatistics.java @@ -0,0 +1,33 @@ +package tech.powerjob.worker.core.tracker.task.stat; + +import lombok.Data; + +import java.io.Serializable; +import java.util.concurrent.atomic.LongAdder; + +/** + * 已提交的任务数量 + * + * @author tjq + * @since 2024/2/21 + */ +@Data +public class CommittedTaskStatistics implements Serializable { + + /** + * 提交成功的数量 + */ + private LongAdder succeedNum = new LongAdder(); + /** + * 提交失败的数量 + */ + private LongAdder failedNum = new LongAdder(); + + /** + * 获取全部的提交任务数量 + * @return 提交任务数量 + */ + public long getTotalCommittedNum() { + return succeedNum.sum() + failedNum.sum(); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/ExternalTaskStatistics.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/ExternalTaskStatistics.java new file mode 100644 index 00000000..50bfaddb --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/ExternalTaskStatistics.java @@ -0,0 +1,31 @@ +package tech.powerjob.worker.core.tracker.task.stat; + +import lombok.Data; + +import java.io.Serializable; +import java.util.concurrent.atomic.LongAdder; + +/** + * 外部任务(未持久化到运行时)统计 + * + * @author tjq + * @since 2024/2/21 + */ +@Data +public class ExternalTaskStatistics implements Serializable { + + /** + * 等待交换进入的运行时的数量 + */ + private LongAdder waitSwapInNum = new LongAdder(); + + /** + * 运行成功,交换外部的数量 + */ + private LongAdder succeedSwapOutNum = new LongAdder(); + + /** + * 盖棺定论失败,交换到外部的数量 + */ + private LongAdder failedSwapOutNum = new LongAdder(); +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceTaskStatistics.java similarity index 94% rename from powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceTaskStatistics.java index 973cb80e..166fab90 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceStatisticsHolder.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/stat/InstanceTaskStatistics.java @@ -11,7 +11,7 @@ import java.io.Serializable; * @since 2024/2/21 */ @Data -public class InstanceStatisticsHolder implements Serializable { +public class InstanceTaskStatistics implements Serializable { /** * 等待派发状态(仅存在 TaskTracker 数据库中) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/ConnectionFactory.java similarity index 98% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/ConnectionFactory.java index bd502361..1e8f95b7 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/ConnectionFactory.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.JavaUtils; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SimpleTaskQuery.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java similarity index 97% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SimpleTaskQuery.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java index 0e9aec88..b811fb7b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SimpleTaskQuery.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/SimpleTaskQuery.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import lombok.Data; import org.apache.commons.lang3.StringUtils; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAO.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAO.java similarity index 96% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAO.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAO.java index 1edf7e5d..8497c186 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAO.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAO.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import tech.powerjob.worker.core.processor.TaskResult; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java similarity index 99% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java index 9863831d..fa893e9d 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDAOImpl.java @@ -1,10 +1,9 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import lombok.AllArgsConstructor; import java.sql.*; import java.util.Collection; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDO.java similarity index 98% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDO.java index 7d815148..da5e60cf 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDO.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskDO.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import lombok.Getter; import lombok.Setter; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskPersistenceService.java similarity index 99% rename from powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskPersistenceService.java index 42bdc8fe..3f1e2f09 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/db/TaskPersistenceService.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.persistence; +package tech.powerjob.worker.persistence.db; import com.google.common.collect.Lists; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java new file mode 100644 index 00000000..4b150b37 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/ExternalTaskPersistenceService.java @@ -0,0 +1,22 @@ +package tech.powerjob.worker.persistence.fs; + +import tech.powerjob.worker.persistence.db.TaskDO; + +import java.util.List; + +/** + * 外部任务持久化服务 + * + * @author tjq + * @since 2024/2/22 + */ +public interface ExternalTaskPersistenceService extends AutoCloseable { + + boolean persistPendingTask(List tasks); + + List readPendingTask(); + + boolean persistFinishedTask(List tasks); + + List readFinishedTask(); +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java new file mode 100644 index 00000000..666b7767 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/FsService.java @@ -0,0 +1,16 @@ +package tech.powerjob.worker.persistence.fs; + +import java.io.IOException; + +/** + * FileSystemService + * + * @author tjq + * @since 2024/2/22 + */ +public interface FsService extends AutoCloseable { + + void writeLine(String content) throws IOException; + + String readLine() throws IOException; +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java new file mode 100644 index 00000000..79086591 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java @@ -0,0 +1,101 @@ +package tech.powerjob.worker.persistence.fs.impl; + +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.persistence.db.TaskDO; +import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; +import tech.powerjob.worker.persistence.fs.FsService; + +import java.util.Collections; +import java.util.List; + +/** + * 外部文件存储服务 + * + * @author tjq + * @since 2024/2/22 + */ +@Slf4j +public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService { + private final Long instanceId; + private final Long subInstanceId; + + private final FsService pendingFsService; + + private final FsService resultFsService; + + private static final String PENDING_FILE_NAME = "%d_%d-pending"; + private static final String RESULT_FILE_NAME = "%d_%d-result"; + + public ExternalTaskFileSystemPersistenceService(Long instanceId, Long subInstanceId) { + this.instanceId = instanceId; + this.subInstanceId = subInstanceId; + + this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId, subInstanceId)); + this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId, subInstanceId)); + } + + @Override + public boolean persistPendingTask(List tasks) { + try { + String content = JsonUtils.toJSONString(tasks); + pendingFsService.writeLine(content); + return true; + } catch (Exception e) { + log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks); + } + return false; + } + + @Override + @SneakyThrows + public List readPendingTask() { + String pendingTaskStr = pendingFsService.readLine(); + TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class); + if (taskDOS != null) { + return Lists.newArrayList(taskDOS); + } + return Collections.emptyList(); + } + + @Override + public boolean persistFinishedTask(List tasks) { + try { + String content = JsonUtils.toJSONString(tasks); + resultFsService.writeLine(content); + return true; + } catch (Exception e) { + log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks); + } + return false; + } + + @Override + @SneakyThrows + public List readFinishedTask() { + String pendingTaskStr = resultFsService.readLine(); + TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class); + if (taskDOS != null) { + return Lists.newArrayList(taskDOS); + } + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + CommonUtils.executeIgnoreException(() -> { + if (pendingFsService != null) { + pendingFsService.close(); + } + }); + + CommonUtils.executeIgnoreException(() -> { + if (resultFsService != null) { + resultFsService.close(); + } + }); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java new file mode 100644 index 00000000..1ad6d0ed --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/LocalDiskFsService.java @@ -0,0 +1,73 @@ +package tech.powerjob.worker.persistence.fs.impl; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.common.utils.PowerFileUtils; +import tech.powerjob.worker.persistence.fs.FsService; + +import java.io.*; + +/** + * 本地磁盘 + * + * @author tjq + * @since 2024/2/22 + */ +@Slf4j +public class LocalDiskFsService implements FsService { + + private static final String WORKSPACE_PATH = PowerFileUtils.workspace() + "/fs/" + CommonUtils.genUUID() + "/"; + + private static final String FILE_NAME_PATTERN = "%s.powerjob"; + + + private final BufferedWriter bufferedWriter; + + private final BufferedReader bufferedReader; + + @SneakyThrows + public LocalDiskFsService(String keyword) { + String fileName = String.format(FILE_NAME_PATTERN, keyword); + String filePath = WORKSPACE_PATH.concat(fileName); + + File file = new File(filePath); + FileUtils.createParentDirectories(file); + + // 在使用 BufferedReader 包装 FileReader 的情况下,不需要单独关闭 FileReader。当你调用 BufferedReader 的 close() 方法时,它会负责关闭它所包装的 FileReader。这是因为 BufferedReader.close() 方法内部会调用它所包装的流的 close() 方法,确保所有相关资源都被释放,包括底层的文件句柄 + FileWriter fileWriter = new FileWriter(file); + this.bufferedWriter = new BufferedWriter(fileWriter); + this.bufferedReader = new BufferedReader(new FileReader(file)); + + log.info("[LocalDiskFsService] new LocalDiskFsService successfully, path: {}", filePath); + } + + @Override + public void writeLine(String content) throws IOException { + bufferedWriter.write(content); + bufferedWriter.newLine(); + bufferedWriter.flush(); + } + + @Override + public String readLine() throws IOException { + return bufferedReader.readLine(); + } + + @Override + public void close() throws Exception { + + CommonUtils.executeIgnoreException(() -> { + if (bufferedWriter != null) { + bufferedWriter.close(); + } + }); + + CommonUtils.executeIgnoreException(() -> { + if (bufferedReader != null) { + bufferedReader.close(); + } + }); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java index c35bc924..fed844fa 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java @@ -3,7 +3,7 @@ package tech.powerjob.worker.pojo.request; import tech.powerjob.common.PowerSerializable; import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.common.serialize.SerializerUtils; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java index 7481ce5e..2e377ad5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -1,7 +1,7 @@ package tech.powerjob.worker.pojo.request; import tech.powerjob.common.PowerSerializable; -import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.db.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; import lombok.Getter; import lombok.NoArgsConstructor; diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java index 9e15ceb5..5ec3af72 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java @@ -1,6 +1,7 @@ package tech.powerjob.worker.persistence; import tech.powerjob.worker.common.constants.TaskStatus; +import tech.powerjob.worker.persistence.db.TaskDO; import java.nio.charset.StandardCharsets; diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java index 39014fc7..51db01bc 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java @@ -2,12 +2,12 @@ package tech.powerjob.worker.persistence; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.util.StopWatch; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.db.*; import java.util.List; import java.util.Map; diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java index 7c02082f..36058f8e 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java @@ -3,14 +3,13 @@ package tech.powerjob.worker.persistence; import com.google.common.collect.Lists; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.db.*; -import java.nio.charset.StandardCharsets; import java.sql.SQLIntegrityConstraintViolationException; import java.util.List; import java.util.Map; diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java index 52357f3e..a1b0d142 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java @@ -3,8 +3,8 @@ package tech.powerjob.worker.test; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.worker.persistence.TaskDO; -import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.persistence.db.TaskDO; +import tech.powerjob.worker.persistence.db.TaskPersistenceService; import com.google.common.collect.Lists; import org.junit.jupiter.api.*;