重现构思,推倒重来!

This commit is contained in:
tjq 2024-02-23 22:58:14 +08:00
parent 53025d6bb1
commit fe6c888507
6 changed files with 83 additions and 22 deletions

View File

@ -92,14 +92,16 @@ public class CommonUtils {
public static void executeIgnoreException(SupplierPlus<?> executor) {
try {
executor.get();
}catch (Exception ignore) {
}catch (Exception e) {
log.warn("executeIgnoreException but exception!", e);
}
}
public static void executeIgnoreException(Meaningless executor) {
try {
executor.m();
}catch (Exception ignore) {
}catch (Exception e) {
log.warn("executeIgnoreException but exception!", e);
}
}

View File

@ -34,6 +34,8 @@ import tech.powerjob.worker.core.tracker.task.stat.ExternalTaskStatistics;
import tech.powerjob.worker.core.tracker.task.stat.InstanceTaskStatistics;
import tech.powerjob.worker.persistence.db.TaskDO;
import tech.powerjob.worker.persistence.db.TaskPersistenceService;
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
@ -62,6 +64,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* 数据库持久化服务
*/
protected final TaskPersistenceService taskPersistenceService;
protected ExternalTaskPersistenceService externalTaskPersistenceService;
/**
* 定时任务线程池
*/
@ -308,14 +311,13 @@ public abstract class HeavyTaskTracker extends TaskTracker {
long totalCommittedNum = committedTaskStatistics.getTotalCommittedNum();
log.debug("[TaskTracker-{}] current committed num: {}, receive new tasks: {}", instanceId, totalCommittedNum, newTaskList);
boolean saveResult = true;
boolean saveResult;
if (totalCommittedNum > maxActiveTaskNum) {
log.info("[TaskTracker-{}] totalCommittedNum({}) > maxActiveTaskNum({}), persist task to disk", instanceId, totalCommittedNum, maxActiveTaskNum);
saveResult = getExternalTaskPersistenceService().persistPendingTask(newTaskList);
} else {
saveResult = taskPersistenceService.batchSave(newTaskList);
}
if (saveResult) {
committedTaskStatistics.getSucceedNum().add(newTaskList.size());
} else {
@ -574,6 +576,14 @@ public abstract class HeavyTaskTracker extends TaskTracker {
}
}
protected class YuGong extends SafeRunnable {
@Override
protected void run0() {
}
}
@Data
@AllArgsConstructor
protected static class TaskBriefInfo {
@ -591,4 +601,21 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* @param req 服务器调度任务实例运行请求
*/
protected abstract void initTaskTracker(ServerScheduleJobReq req);
protected ExternalTaskPersistenceService getExternalTaskPersistenceService() {
if (externalTaskPersistenceService != null) {
return externalTaskPersistenceService;
}
synchronized (this) {
if (externalTaskPersistenceService != null) {
return externalTaskPersistenceService;
}
boolean needResult = ExecuteType.MAP_REDUCE.equals(executeType);
log.info("[TaskTracker-{}] totalCommittedNum({}) > maxActiveTaskNum({}), start to use ExternalTaskPersistenceService", instanceId, committedTaskStatistics.getTotalCommittedNum(), maxActiveTaskNum);
externalTaskPersistenceService = new ExternalTaskFileSystemPersistenceService(instanceId, needResult);
return externalTaskPersistenceService;
}
}
}

View File

@ -2,6 +2,7 @@ package tech.powerjob.worker.persistence.fs;
import tech.powerjob.worker.persistence.db.TaskDO;
import java.io.Closeable;
import java.util.List;
/**
@ -10,7 +11,7 @@ import java.util.List;
* @author tjq
* @since 2024/2/22
*/
public interface ExternalTaskPersistenceService extends AutoCloseable {
public interface ExternalTaskPersistenceService extends Closeable {
boolean persistPendingTask(List<TaskDO> tasks);

View File

@ -1,5 +1,6 @@
package tech.powerjob.worker.persistence.fs;
import java.io.Closeable;
import java.io.IOException;
/**
@ -8,7 +9,7 @@ import java.io.IOException;
* @author tjq
* @since 2024/2/22
*/
public interface FsService extends AutoCloseable {
public interface FsService extends Closeable {
void writeLine(String content) throws IOException;

View File

@ -9,6 +9,7 @@ import tech.powerjob.worker.persistence.db.TaskDO;
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
import tech.powerjob.worker.persistence.fs.FsService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@ -21,21 +22,35 @@ import java.util.List;
@Slf4j
public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService {
private final Long instanceId;
private final Long subInstanceId;
private final FsService pendingFsService;
private final FsService resultFsService;
private static final String PENDING_FILE_NAME = "%d_%d-pending";
private static final String RESULT_FILE_NAME = "%d_%d-result";
private static final String PENDING_FILE_NAME = "%d-pending";
private static final String RESULT_FILE_NAME = "%d-result";
public ExternalTaskFileSystemPersistenceService(Long instanceId, Long subInstanceId) {
public ExternalTaskFileSystemPersistenceService(Long instanceId, boolean needResult) {
this.instanceId = instanceId;
this.subInstanceId = subInstanceId;
this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId, subInstanceId));
this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId, subInstanceId));
this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId));
if (needResult) {
this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId));
} else {
this.resultFsService = new FsService() {
@Override
public void writeLine(String content) throws IOException {
}
@Override
public String readLine() throws IOException {
return null;
}
@Override
public void close() {
}
};
}
}
@Override
@ -45,7 +60,7 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
pendingFsService.writeLine(content);
return true;
} catch (Exception e) {
log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks);
log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks);
}
return false;
}
@ -68,7 +83,7 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
resultFsService.writeLine(content);
return true;
} catch (Exception e) {
log.error("[ExternalTaskPersistenceService] [{}-{}] persistPendingTask failed: {}", instanceId, subInstanceId, tasks);
log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks);
}
return false;
}
@ -85,7 +100,7 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
}
@Override
public void close() throws Exception {
public void close() {
CommonUtils.executeIgnoreException(() -> {
if (pendingFsService != null) {
pendingFsService.close();

View File

@ -22,7 +22,7 @@ public class LocalDiskFsService implements FsService {
private static final String FILE_NAME_PATTERN = "%s.powerjob";
private final File file;
private final BufferedWriter bufferedWriter;
private final BufferedReader bufferedReader;
@ -32,7 +32,7 @@ public class LocalDiskFsService implements FsService {
String fileName = String.format(FILE_NAME_PATTERN, keyword);
String filePath = WORKSPACE_PATH.concat(fileName);
File file = new File(filePath);
this.file = new File(filePath);
FileUtils.createParentDirectories(file);
// 在使用 BufferedReader 包装 FileReader 的情况下不需要单独关闭 FileReader当你调用 BufferedReader close() 方法时它会负责关闭它所包装的 FileReader这是因为 BufferedReader.close() 方法内部会调用它所包装的流的 close() 方法确保所有相关资源都被释放包括底层的文件句柄
@ -43,20 +43,30 @@ public class LocalDiskFsService implements FsService {
log.info("[LocalDiskFsService] new LocalDiskFsService successfully, path: {}", filePath);
}
/**
* 按行写数据线程不安全考虑到此处不用太在意性能直接 synchronized
* @param content 内容
* @throws IOException 异常
*/
@Override
public void writeLine(String content) throws IOException {
public synchronized void writeLine(String content) throws IOException {
bufferedWriter.write(content);
bufferedWriter.newLine();
bufferedWriter.flush();
}
/**
* 按行读数据线程不安全考虑到此处不用太在意性能直接 synchronized
* @return 内容
* @throws IOException 异常
*/
@Override
public String readLine() throws IOException {
public synchronized String readLine() throws IOException {
return bufferedReader.readLine();
}
@Override
public void close() throws Exception {
public void close() {
CommonUtils.executeIgnoreException(() -> {
if (bufferedWriter != null) {
@ -69,5 +79,10 @@ public class LocalDiskFsService implements FsService {
bufferedReader.close();
}
});
CommonUtils.executeIgnoreException(() -> {
boolean delete = file.delete();
log.info("[LocalDiskFsService] delete file[{}] result: {}", file, delete);
});
}
}