mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
redesign TaskTracker's lock solutions
This commit is contained in:
parent
05206cd43d
commit
e405e283ad
@ -35,6 +35,7 @@ import java.util.Optional;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新
|
||||
@ -61,6 +62,11 @@ public abstract class TaskTracker {
|
||||
protected AtomicBoolean finished;
|
||||
// 上报时间缓存
|
||||
private Cache<String, Long> taskId2LastReportTime;
|
||||
// 分段锁
|
||||
private ReentrantLock[] locks = new ReentrantLock[UPDATE_CONCURRENCY];
|
||||
|
||||
private static final int UPDATE_CONCURRENCY = 8;
|
||||
private static final int UPDATE_LOCK_MASK = UPDATE_CONCURRENCY - 1;
|
||||
|
||||
protected TaskTracker(ServerScheduleJobReq req) {
|
||||
|
||||
@ -84,6 +90,11 @@ public abstract class TaskTracker {
|
||||
// 构建缓存
|
||||
taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024).build();
|
||||
|
||||
// 构建分段锁
|
||||
for (int i = 0; i < UPDATE_CONCURRENCY; i++) {
|
||||
locks[i] = new ReentrantLock();
|
||||
}
|
||||
|
||||
// 子类自定义初始化操作
|
||||
initTaskTracker(req);
|
||||
|
||||
@ -107,6 +118,7 @@ public abstract class TaskTracker {
|
||||
/* *************************** 对外方法区 *************************** */
|
||||
/**
|
||||
* 更新Task状态
|
||||
* V1.0.0 -> V1.0.1 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已
|
||||
* @param taskId task的ID(task为任务实例的执行单位)
|
||||
* @param newStatus task的新状态
|
||||
* @param reportTime 上报时间
|
||||
@ -114,12 +126,13 @@ public abstract class TaskTracker {
|
||||
*/
|
||||
public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
|
||||
|
||||
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
|
||||
ReentrantLock lock = locks[taskId.hashCode() & UPDATE_LOCK_MASK];
|
||||
|
||||
// 同一个task,串行执行
|
||||
// 需要保证 worker 的其他代码没有用 taskId 或者 String 作为锁...否则就等着找bug吧...(主要是不舍得加前缀,这用的可以常量池内存啊...)
|
||||
// taskId其实是可能重复的(同一台机器上多个 TaskTracker...不过真实冲突概率较低,就算冲突了也问题不大,忽略)
|
||||
synchronized (taskId.intern()) {
|
||||
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
|
||||
try {
|
||||
|
||||
// 阻塞获取锁
|
||||
lock.lock();
|
||||
|
||||
Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId);
|
||||
|
||||
@ -129,7 +142,7 @@ public abstract class TaskTracker {
|
||||
if (taskOpt.isPresent()) {
|
||||
lastReportTime = taskOpt.get().getLastReportTime();
|
||||
}else {
|
||||
// 理论上不存在这种情况
|
||||
// 理论上不存在这种情况,除非数据库异常
|
||||
log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", instanceId, instanceId, taskId);
|
||||
}
|
||||
|
||||
@ -193,6 +206,9 @@ public abstract class TaskTracker {
|
||||
if (!updateResult) {
|
||||
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
|
||||
}
|
||||
|
||||
}finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,7 @@ public class TaskPersistenceService {
|
||||
|
||||
// 默认重试参数
|
||||
private static final int RETRY_TIMES = 3;
|
||||
private static final long RETRY_INTERVAL_MS = 200;
|
||||
private static final long RETRY_INTERVAL_MS = 100;
|
||||
|
||||
private static volatile boolean initialized = false;
|
||||
public static TaskPersistenceService INSTANCE = new TaskPersistenceService();
|
||||
|
Loading…
x
Reference in New Issue
Block a user