mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: ExternalTaskPersistenceService
This commit is contained in:
parent
90b740e325
commit
53025d6bb1
@ -55,4 +55,10 @@ public class PowerJobDKey {
|
||||
*/
|
||||
public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval";
|
||||
|
||||
/* 不太可能有人用的参数 */
|
||||
|
||||
/**
|
||||
* 最大活跃任务数量,超出部分 SWAP 到磁盘以提升性能
|
||||
*/
|
||||
public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num";
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.common.utils.WorkerNetUtils;
|
||||
import tech.powerjob.worker.core.executor.ExecutorManager;
|
||||
import tech.powerjob.worker.extension.processor.ProcessorFactory;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.db.TaskPersistenceService;
|
||||
import tech.powerjob.worker.processor.PowerJobProcessorLoader;
|
||||
import tech.powerjob.worker.processor.ProcessorLoader;
|
||||
import tech.powerjob.worker.processor.impl.BuiltInDefaultProcessorFactory;
|
||||
|
@ -9,7 +9,7 @@ import tech.powerjob.remote.framework.actor.ProcessType;
|
||||
import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
|
||||
import tech.powerjob.worker.core.tracker.processor.ProcessorTracker;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
|
||||
|
||||
|
@ -18,7 +18,7 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
|
||||
import tech.powerjob.worker.core.tracker.task.TaskTracker;
|
||||
import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker;
|
||||
import tech.powerjob.worker.core.tracker.task.light.LightTaskTracker;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
|
@ -1,6 +1,6 @@
|
||||
package tech.powerjob.worker.common;
|
||||
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -6,7 +6,7 @@ import tech.powerjob.remote.framework.transporter.Transporter;
|
||||
import tech.powerjob.worker.background.OmsLogHandler;
|
||||
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
|
||||
import tech.powerjob.worker.core.executor.ExecutorManager;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.db.TaskPersistenceService;
|
||||
import tech.powerjob.worker.processor.ProcessorLoader;
|
||||
|
||||
import java.util.Optional;
|
||||
|
@ -21,7 +21,7 @@ import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
|
||||
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
|
||||
import tech.powerjob.worker.extension.processor.ProcessorBean;
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
|
||||
|
@ -8,7 +8,7 @@ import tech.powerjob.worker.common.ThreadLocalStore;
|
||||
import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.common.constants.TaskConstant;
|
||||
import tech.powerjob.worker.common.utils.TransportUtils;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -19,7 +19,7 @@ import tech.powerjob.worker.extension.processor.ProcessorBean;
|
||||
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
import tech.powerjob.worker.log.OmsLoggerFactory;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
|
@ -18,8 +18,8 @@ import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.common.constants.TaskConstant;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.common.utils.TransportUtils;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -86,7 +86,7 @@ public class CommonTaskTracker extends HeavyTaskTracker {
|
||||
detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress());
|
||||
|
||||
// 填充详细信息
|
||||
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
|
||||
InstanceTaskStatistics holder = getInstanceStatisticsHolder(instanceId);
|
||||
InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
|
||||
taskDetail.setSucceedTaskNum(holder.getSucceedNum());
|
||||
taskDetail.setFailedTaskNum(holder.getFailedNum());
|
||||
@ -105,18 +105,12 @@ public class CommonTaskTracker extends HeavyTaskTracker {
|
||||
private void persistenceRootTask() {
|
||||
|
||||
TaskDO rootTask = new TaskDO();
|
||||
rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
rootTask.setInstanceId(instanceInfo.getInstanceId());
|
||||
rootTask.setTaskId(ROOT_TASK_ID);
|
||||
rootTask.setFailedCnt(0);
|
||||
rootTask.setAddress(workerRuntime.getWorkerAddress());
|
||||
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
|
||||
rootTask.setCreatedTime(System.currentTimeMillis());
|
||||
rootTask.setLastModifiedTime(System.currentTimeMillis());
|
||||
rootTask.setLastReportTime(-1L);
|
||||
rootTask.setSubInstanceId(instanceId);
|
||||
|
||||
if (taskPersistenceService.save(rootTask)) {
|
||||
if (submitTask(Lists.newArrayList(rootTask))) {
|
||||
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
|
||||
} else {
|
||||
log.error("[TaskTracker-{}] create root task failed.", instanceId);
|
||||
@ -135,7 +129,7 @@ public class CommonTaskTracker extends HeavyTaskTracker {
|
||||
@SuppressWarnings("squid:S3776")
|
||||
private void innerRun() {
|
||||
|
||||
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
|
||||
InstanceTaskStatistics holder = getInstanceStatisticsHolder(instanceId);
|
||||
|
||||
long finishedNum = holder.getFinishedNum();
|
||||
long unfinishedNum = holder.getUnfinishedNum();
|
||||
|
@ -22,8 +22,8 @@ import tech.powerjob.worker.common.constants.TaskConstant;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.common.utils.LRUCache;
|
||||
import tech.powerjob.worker.common.utils.TransportUtils;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -184,13 +184,11 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
|
||||
newRootTask.setTaskId(taskId);
|
||||
|
||||
newRootTask.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
|
||||
newRootTask.setFailedCnt(0);
|
||||
|
||||
// 根任务总是默认本机执行
|
||||
newRootTask.setAddress(myAddress);
|
||||
newRootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
|
||||
newRootTask.setCreatedTime(System.currentTimeMillis());
|
||||
newRootTask.setLastModifiedTime(System.currentTimeMillis());
|
||||
newRootTask.setLastReportTime(-1L);
|
||||
|
||||
|
||||
// 判断是否超出最大执行实例数
|
||||
if (maxInstanceNum > 0) {
|
||||
@ -204,7 +202,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
|
||||
}
|
||||
|
||||
// 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错)
|
||||
if (!taskPersistenceService.save(newRootTask)) {
|
||||
if (!submitTask(Lists.newArrayList(newRootTask))) {
|
||||
log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId);
|
||||
processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED");
|
||||
return;
|
||||
@ -282,7 +280,7 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
|
||||
}
|
||||
|
||||
// 查看执行情况
|
||||
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId);
|
||||
InstanceTaskStatistics holder = getInstanceStatisticsHolder(subInstanceId);
|
||||
|
||||
long finishedNum = holder.getFinishedNum();
|
||||
long unfinishedNum = holder.getUnfinishedNum();
|
||||
|
@ -9,6 +9,7 @@ import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import tech.powerjob.common.PowerJobDKey;
|
||||
import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.common.enhance.SafeRunnable;
|
||||
import tech.powerjob.common.enums.ExecuteType;
|
||||
@ -28,9 +29,11 @@ import tech.powerjob.worker.common.utils.WorkflowContextUtils;
|
||||
import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder;
|
||||
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
|
||||
import tech.powerjob.worker.core.tracker.task.TaskTracker;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.InstanceStatisticsHolder;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.CommittedTaskStatistics;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.ExternalTaskStatistics;
|
||||
import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskPersistenceService;
|
||||
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
|
||||
@ -68,6 +71,16 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
*/
|
||||
private final Cache<String, TaskBriefInfo> taskId2BriefInfo;
|
||||
|
||||
/**
|
||||
* 任务统计相关
|
||||
*/
|
||||
private final ExternalTaskStatistics externalTaskStatistics;
|
||||
private final CommittedTaskStatistics committedTaskStatistics;
|
||||
|
||||
/**
|
||||
* 运行时最大任务数量,超出 SWAP 到磁盘
|
||||
*/
|
||||
private final long maxActiveTaskNum;
|
||||
|
||||
/**
|
||||
* 分段锁
|
||||
@ -87,6 +100,11 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
// 构建缓存
|
||||
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build();
|
||||
|
||||
// 初始化统计参数
|
||||
this.externalTaskStatistics = new ExternalTaskStatistics();
|
||||
this.committedTaskStatistics = new CommittedTaskStatistics();
|
||||
this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, "100000"));
|
||||
|
||||
// 构建分段锁
|
||||
segmentLock = new SegmentLock(UPDATE_CONCURRENCY);
|
||||
|
||||
@ -275,16 +293,36 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
}
|
||||
// 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
|
||||
newTaskList.forEach(task -> {
|
||||
|
||||
// 秒级任务持久化的同时直接派发,有直接的状态
|
||||
if (task.getStatus() == null) {
|
||||
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
}
|
||||
task.setInstanceId(instanceId);
|
||||
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
task.setFailedCnt(0);
|
||||
task.setLastModifiedTime(System.currentTimeMillis());
|
||||
task.setCreatedTime(System.currentTimeMillis());
|
||||
task.setLastReportTime(-1L);
|
||||
});
|
||||
|
||||
log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
|
||||
return taskPersistenceService.batchSave(newTaskList);
|
||||
long totalCommittedNum = committedTaskStatistics.getTotalCommittedNum();
|
||||
log.debug("[TaskTracker-{}] current committed num: {}, receive new tasks: {}", instanceId, totalCommittedNum, newTaskList);
|
||||
|
||||
boolean saveResult = true;
|
||||
if (totalCommittedNum > maxActiveTaskNum) {
|
||||
log.info("[TaskTracker-{}] totalCommittedNum({}) > maxActiveTaskNum({}), persist task to disk", instanceId, totalCommittedNum, maxActiveTaskNum);
|
||||
} else {
|
||||
saveResult = taskPersistenceService.batchSave(newTaskList);
|
||||
}
|
||||
|
||||
|
||||
if (saveResult) {
|
||||
committedTaskStatistics.getSucceedNum().add(newTaskList.size());
|
||||
} else {
|
||||
committedTaskStatistics.getFailedNum().add(newTaskList.size());
|
||||
log.error("[TaskTracker-{}] batchSave new tasks failed, please check the log", instanceId);
|
||||
}
|
||||
return saveResult;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -428,10 +466,10 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
* @param subInstanceId 子任务实例ID
|
||||
* @return InstanceStatisticsHolder
|
||||
*/
|
||||
protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {
|
||||
protected InstanceTaskStatistics getInstanceStatisticsHolder(long subInstanceId) {
|
||||
|
||||
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
|
||||
InstanceStatisticsHolder holder = new InstanceStatisticsHolder();
|
||||
InstanceTaskStatistics holder = new InstanceTaskStatistics();
|
||||
|
||||
holder.setWaitingDispatchNum(status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L));
|
||||
holder.setWorkerUnreceivedNum(status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L));
|
||||
|
@ -0,0 +1,33 @@
|
||||
package tech.powerjob.worker.core.tracker.task.stat;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
/**
|
||||
* 已提交的任务数量
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/21
|
||||
*/
|
||||
@Data
|
||||
public class CommittedTaskStatistics implements Serializable {
|
||||
|
||||
/**
|
||||
* 提交成功的数量
|
||||
*/
|
||||
private LongAdder succeedNum = new LongAdder();
|
||||
/**
|
||||
* 提交失败的数量
|
||||
*/
|
||||
private LongAdder failedNum = new LongAdder();
|
||||
|
||||
/**
|
||||
* 获取全部的提交任务数量
|
||||
* @return 提交任务数量
|
||||
*/
|
||||
public long getTotalCommittedNum() {
|
||||
return succeedNum.sum() + failedNum.sum();
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package tech.powerjob.worker.core.tracker.task.stat;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
/**
|
||||
* 外部任务(未持久化到运行时)统计
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/21
|
||||
*/
|
||||
@Data
|
||||
public class ExternalTaskStatistics implements Serializable {
|
||||
|
||||
/**
|
||||
* 等待交换进入的运行时的数量
|
||||
*/
|
||||
private LongAdder waitSwapInNum = new LongAdder();
|
||||
|
||||
/**
|
||||
* 运行成功,交换外部的数量
|
||||
*/
|
||||
private LongAdder succeedSwapOutNum = new LongAdder();
|
||||
|
||||
/**
|
||||
* 盖棺定论失败,交换到外部的数量
|
||||
*/
|
||||
private LongAdder failedSwapOutNum = new LongAdder();
|
||||
}
|
@ -11,7 +11,7 @@ import java.io.Serializable;
|
||||
* @since 2024/2/21
|
||||
*/
|
||||
@Data
|
||||
public class InstanceStatisticsHolder implements Serializable {
|
||||
public class InstanceTaskStatistics implements Serializable {
|
||||
|
||||
/**
|
||||
* 等待派发状态(仅存在 TaskTracker 数据库中)
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.common.utils.JavaUtils;
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
|
@ -1,10 +1,9 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.AllArgsConstructor;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.Collection;
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
@ -1,4 +1,4 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
package tech.powerjob.worker.persistence.db;
|
||||
|
||||
|
||||
import com.google.common.collect.Lists;
|
@ -0,0 +1,22 @@
|
||||
package tech.powerjob.worker.persistence.fs;
|
||||
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 外部任务持久化服务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
public interface ExternalTaskPersistenceService extends AutoCloseable {
|
||||
|
||||
boolean persistPendingTask(List<TaskDO> tasks);
|
||||
|
||||
List<TaskDO> readPendingTask();
|
||||
|
||||
boolean persistFinishedTask(List<TaskDO> tasks);
|
||||
|
||||
List<TaskDO> readFinishedTask();
|
||||
}
|
@ -0,0 +1,16 @@
|
||||
package tech.powerjob.worker.persistence.fs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* FileSystemService
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
public interface FsService extends AutoCloseable {
|
||||
|
||||
void writeLine(String content) throws IOException;
|
||||
|
||||
String readLine() throws IOException;
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
package tech.powerjob.worker.persistence.fs.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.fs.FsService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 外部文件存储服务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService {
|
||||
private final Long instanceId;
|
||||
private final Long subInstanceId;
|
||||
|
||||
private final FsService pendingFsService;
|
||||
|
||||
private final FsService resultFsService;
|
||||
|
||||
private static final String PENDING_FILE_NAME = "%d_%d-pending";
|
||||
private static final String RESULT_FILE_NAME = "%d_%d-result";
|
||||
|
||||
public ExternalTaskFileSystemPersistenceService(Long instanceId, Long subInstanceId) {
|
||||
this.instanceId = instanceId;
|
||||
this.subInstanceId = subInstanceId;
|
||||
|
||||
this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId, subInstanceId));
|
||||
this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId, subInstanceId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean persistPendingTask(List<TaskDO> tasks) {
|
||||
try {
|
||||
String content = JsonUtils.toJSONString(tasks);
|
||||
pendingFsService.writeLine(content);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public List<TaskDO> readPendingTask() {
|
||||
String pendingTaskStr = pendingFsService.readLine();
|
||||
TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
|
||||
if (taskDOS != null) {
|
||||
return Lists.newArrayList(taskDOS);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean persistFinishedTask(List<TaskDO> tasks) {
|
||||
try {
|
||||
String content = JsonUtils.toJSONString(tasks);
|
||||
resultFsService.writeLine(content);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public List<TaskDO> readFinishedTask() {
|
||||
String pendingTaskStr = resultFsService.readLine();
|
||||
TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
|
||||
if (taskDOS != null) {
|
||||
return Lists.newArrayList(taskDOS);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (pendingFsService != null) {
|
||||
pendingFsService.close();
|
||||
}
|
||||
});
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (resultFsService != null) {
|
||||
resultFsService.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package tech.powerjob.worker.persistence.fs.impl;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.worker.common.utils.PowerFileUtils;
|
||||
import tech.powerjob.worker.persistence.fs.FsService;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* 本地磁盘
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/2/22
|
||||
*/
|
||||
@Slf4j
|
||||
public class LocalDiskFsService implements FsService {
|
||||
|
||||
private static final String WORKSPACE_PATH = PowerFileUtils.workspace() + "/fs/" + CommonUtils.genUUID() + "/";
|
||||
|
||||
private static final String FILE_NAME_PATTERN = "%s.powerjob";
|
||||
|
||||
|
||||
private final BufferedWriter bufferedWriter;
|
||||
|
||||
private final BufferedReader bufferedReader;
|
||||
|
||||
@SneakyThrows
|
||||
public LocalDiskFsService(String keyword) {
|
||||
String fileName = String.format(FILE_NAME_PATTERN, keyword);
|
||||
String filePath = WORKSPACE_PATH.concat(fileName);
|
||||
|
||||
File file = new File(filePath);
|
||||
FileUtils.createParentDirectories(file);
|
||||
|
||||
// 在使用 BufferedReader 包装 FileReader 的情况下,不需要单独关闭 FileReader。当你调用 BufferedReader 的 close() 方法时,它会负责关闭它所包装的 FileReader。这是因为 BufferedReader.close() 方法内部会调用它所包装的流的 close() 方法,确保所有相关资源都被释放,包括底层的文件句柄
|
||||
FileWriter fileWriter = new FileWriter(file);
|
||||
this.bufferedWriter = new BufferedWriter(fileWriter);
|
||||
this.bufferedReader = new BufferedReader(new FileReader(file));
|
||||
|
||||
log.info("[LocalDiskFsService] new LocalDiskFsService successfully, path: {}", filePath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeLine(String content) throws IOException {
|
||||
bufferedWriter.write(content);
|
||||
bufferedWriter.newLine();
|
||||
bufferedWriter.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
return bufferedReader.readLine();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (bufferedWriter != null) {
|
||||
bufferedWriter.close();
|
||||
}
|
||||
});
|
||||
|
||||
CommonUtils.executeIgnoreException(() -> {
|
||||
if (bufferedReader != null) {
|
||||
bufferedReader.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@ package tech.powerjob.worker.pojo.request;
|
||||
import tech.powerjob.common.PowerSerializable;
|
||||
import tech.powerjob.worker.common.ThreadLocalStore;
|
||||
import tech.powerjob.common.serialize.SerializerUtils;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
@ -1,7 +1,7 @@
|
||||
package tech.powerjob.worker.pojo.request;
|
||||
|
||||
import tech.powerjob.common.PowerSerializable;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.worker.persistence;
|
||||
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
|
@ -2,12 +2,12 @@ package tech.powerjob.worker.persistence;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.util.StopWatch;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.db.*;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -3,14 +3,13 @@ package tech.powerjob.worker.persistence;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.worker.core.processor.TaskResult;
|
||||
import tech.powerjob.worker.persistence.db.*;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.SQLIntegrityConstraintViolationException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -3,8 +3,8 @@ package tech.powerjob.worker.test;
|
||||
import tech.powerjob.worker.common.constants.StoreStrategy;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.persistence.TaskPersistenceService;
|
||||
import tech.powerjob.worker.persistence.db.TaskDO;
|
||||
import tech.powerjob.worker.persistence.db.TaskPersistenceService;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.jupiter.api.*;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user