From e405e283ad7f97b0b4e5d369c7de884c0caf9192 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 30 Apr 2020 09:32:43 +0800 Subject: [PATCH] redesign TaskTracker's lock solutions --- .../worker/core/tracker/task/TaskTracker.java | 28 +++++++++++++++---- .../persistence/TaskPersistenceService.java | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index e7ac59bb..d1ca0d9f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; /** * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 @@ -61,6 +62,11 @@ public abstract class TaskTracker { protected AtomicBoolean finished; // 上报时间缓存 private Cache taskId2LastReportTime; + // 分段锁 + private ReentrantLock[] locks = new ReentrantLock[UPDATE_CONCURRENCY]; + + private static final int UPDATE_CONCURRENCY = 8; + private static final int UPDATE_LOCK_MASK = UPDATE_CONCURRENCY - 1; protected TaskTracker(ServerScheduleJobReq req) { @@ -84,6 +90,11 @@ public abstract class TaskTracker { // 构建缓存 taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024).build(); + // 构建分段锁 + for (int i = 0; i < UPDATE_CONCURRENCY; i++) { + locks[i] = new ReentrantLock(); + } + // 子类自定义初始化操作 initTaskTracker(req); @@ -107,6 +118,7 @@ public abstract class TaskTracker { /* *************************** 对外方法区 *************************** */ /** * 更新Task状态 + * V1.0.0 -> V1.0.1 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间 @@ -114,12 +126,13 @@ public abstract class TaskTracker { */ public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { - TaskStatus nTaskStatus = TaskStatus.of(newStatus); + ReentrantLock lock = locks[taskId.hashCode() & UPDATE_LOCK_MASK]; - // 同一个task,串行执行 - // 需要保证 worker 的其他代码没有用 taskId 或者 String 作为锁...否则就等着找bug吧...(主要是不舍得加前缀,这用的可以常量池内存啊...) - // taskId其实是可能重复的(同一台机器上多个 TaskTracker...不过真实冲突概率较低,就算冲突了也问题不大,忽略) - synchronized (taskId.intern()) { + TaskStatus nTaskStatus = TaskStatus.of(newStatus); + try { + + // 阻塞获取锁 + lock.lock(); Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId); @@ -129,7 +142,7 @@ public abstract class TaskTracker { if (taskOpt.isPresent()) { lastReportTime = taskOpt.get().getLastReportTime(); }else { - // 理论上不存在这种情况 + // 理论上不存在这种情况,除非数据库异常 log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", instanceId, instanceId, taskId); } @@ -193,6 +206,9 @@ public abstract class TaskTracker { if (!updateResult) { log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); } + + }finally { + lock.unlock(); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 006a82c1..c2335235 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -28,7 +28,7 @@ public class TaskPersistenceService { // 默认重试参数 private static final int RETRY_TIMES = 3; - private static final long RETRY_INTERVAL_MS = 200; + private static final long RETRY_INTERVAL_MS = 100; private static volatile boolean initialized = false; public static TaskPersistenceService INSTANCE = new TaskPersistenceService();