perf: use cached lock replace SegmentLock

This commit is contained in:
Echo009 2022-04-27 19:56:48 +08:00
parent 0c4eb3834a
commit 5ed6eac38a
11 changed files with 169 additions and 63 deletions

View File

@ -1,5 +1,7 @@
package tech.powerjob.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -9,6 +11,7 @@ import java.util.concurrent.locks.ReentrantLock;
* @author tjq
* @since 2020/6/3
*/
@Slf4j
public class SegmentLock {
private final int mask;

View File

@ -6,7 +6,7 @@ import tech.powerjob.common.enums.*;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.instance.InstanceMetadataService;
import tech.powerjob.server.core.lock.UseSegmentLock;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
@ -57,8 +57,8 @@ public class DispatchService {
* @param jobInfo 任务信息注意这里传入的任务信息有可能为
* @param instanceId 实例ID
*/
@UseSegmentLock(type = "redispatch", key = "#jobInfo.getId() ?: 0", concurrencyLevel = 16)
public void redispatch(JobInfoDO jobInfo, long instanceId) {
@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void redispatch(JobInfoDO jobInfo, Long instanceId) {
InstanceInfoDO instance = instanceInfoRepository.findByInstanceId(instanceId);
// 将状态重置为等待派发
instance.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
@ -81,8 +81,8 @@ public class DispatchService {
* @param jobInfo 任务的元信息
* @param instanceId 任务实例ID
*/
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId() ?: 0", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, long instanceId) {
@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, Long instanceId) {
// 检查当前任务是否被取消
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
Long jobId = instanceInfo.getJobId();

View File

@ -6,14 +6,15 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* use segment lock to make concurrent safe
* use cached lock to make concurrent safe
*
* @author tjq
* @author Echo009
* @since 1/16/21
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseSegmentLock {
public @interface UseCacheLock {
String type();

View File

@ -0,0 +1,65 @@
package tech.powerjob.server.core.lock;
import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.utils.AOPUtils;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* aspect for @UseSegmentLock
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
@Aspect
@Component
@Order(1)
public class UseCacheLockAspect {
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
private static final long SLOW_THRESHOLD = 100;
@Around(value = "@annotation(useCacheLock))")
public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
int concurrencyLevel = useCacheLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
return CacheBuilder.newBuilder()
.initialCapacity(300000)
.maximumSize(500000)
.concurrencyLevel(concurrencyLevel)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
});
final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {
long timeCost = System.currentTimeMillis() - start;
if (timeCost > SLOW_THRESHOLD) {
log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
useCacheLock.key(),
JSON.toJSONString(point.getArgs()));
}
return point.proceed();
} finally {
reentrantLock.unlock();
}
}
}

View File

@ -1,45 +0,0 @@
package tech.powerjob.server.core.lock;
import tech.powerjob.common.utils.SegmentLock;
import org.springframework.core.annotation.Order;
import tech.powerjob.server.common.utils.AOPUtils;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* aspect for @UseSegmentLock
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
@Aspect
@Component
@Order(1)
public class UseSegmentLockAspect {
private final Map<String, SegmentLock> lockStore = Maps.newConcurrentMap();
@Around(value = "@annotation(useSegmentLock))")
public Object execute(ProceedingJoinPoint point, UseSegmentLock useSegmentLock) throws Throwable {
SegmentLock segmentLock = lockStore.computeIfAbsent(useSegmentLock.type(), ignore -> {
int concurrencyLevel = useSegmentLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create SegmentLock for [{}] with concurrencyLevel: {}", useSegmentLock.type(), concurrencyLevel);
return new SegmentLock(concurrencyLevel);
});
int index = AOPUtils.parseSpEl(AOPUtils.parseMethod(point), point.getArgs(), useSegmentLock.key(), Integer.class, 1);
try {
segmentLock.lockInterruptibleSafe(index);
return point.proceed();
} finally {
segmentLock.unlock(index);
}
}
}

View File

@ -6,7 +6,7 @@ package tech.powerjob.server.core.uid;
* @author tjq
* @since 2020/4/6
*/
class SnowFlakeIdGenerator {
public class SnowFlakeIdGenerator {
/**
* 起始的时间戳(a special day for me)

View File

@ -19,7 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseSegmentLock;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
import tech.powerjob.server.core.uid.IdGenerateService;
@ -200,7 +200,7 @@ public class WorkflowInstanceManager {
* @param wfInfo 工作流任务信息
* @param wfInstanceId 工作流任务实例ID
*/
@UseSegmentLock(type = "startWfInstance", key = "#wfInfo.getId().intValue()", concurrencyLevel = 1024)
@UseCacheLock(type = "processWfInstance", key = "#wfInfo.getMaxWfInstanceNum() > 0 ? #wfInfo.getId() : #wfInstanceId", concurrencyLevel = 1024)
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
@ -272,7 +272,7 @@ public class WorkflowInstanceManager {
* @param result 完成任务的任务实例结果
*/
@SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"})
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
@UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
@ -386,7 +386,7 @@ public class WorkflowInstanceManager {
* @param appendedWfContextData 追加的上下文数据
* @since 2021/02/05
*/
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
@UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) {
try {

View File

@ -9,7 +9,7 @@ import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.response.WorkflowInstanceInfoDTO;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.lock.UseSegmentLock;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
@ -74,7 +74,7 @@ public class WorkflowInstanceService {
* @param appId 所属应用ID
*/
@DesignateServer
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
@UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
@ -120,7 +120,7 @@ public class WorkflowInstanceService {
* @param appId 应用ID
*/
@DesignateServer
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
@UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
public void retryWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
// 仅允许重试 失败的工作流
@ -189,7 +189,7 @@ public class WorkflowInstanceService {
* @param nodeId 节点 ID
*/
@DesignateServer
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
@UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);

View File

@ -20,6 +20,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.List;
import java.util.function.LongToDoubleFunction;
import java.util.stream.Collectors;
/**
@ -88,8 +89,11 @@ public class WorkflowController {
}
@GetMapping("/run")
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0L));
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId,
@RequestParam(required = false,defaultValue = "0") Long delay,
@RequestParam(required = false) String initParams
) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay));
}
@GetMapping("/fetch")

View File

@ -0,0 +1,51 @@
package tech.powerjob.server.test;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.core.uid.SnowFlakeIdGenerator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* @author Echo009
* @since 2022/4/27
*/
@Slf4j
public class ConflictTest {
@Test
@SuppressWarnings("all")
@SneakyThrows
public void segmentLockMockTest() {
final SnowFlakeIdGenerator snowFlakeIdGenerator = new SnowFlakeIdGenerator(0, 4);
int len = CommonUtils.formatSize(1024) - 1;
Map<Integer, Integer> matchCount = new TreeMap<>();
int maxTime = 100000;
int expectedMaxConflict = maxTime / len;
for (int i = 0; i < maxTime; i++) {
final long id = snowFlakeIdGenerator.nextId();
// 这里模拟实际的请求间隔新建任务在 1k qps
Thread.sleep(1);
// int res = Long.valueOf(id).intValue() & len;
int res = String.valueOf(Long.valueOf(id).intValue()).hashCode() & len;
matchCount.merge(res, 1, Integer::sum);
}
final List<Map.Entry<Integer, Integer>> sorted = matchCount.entrySet().stream().sorted((a, b) -> b.getValue() - a.getValue()).collect(Collectors.toList());
// 10w qps 这里包括实例状态上报的请求最大冲突次数 407假设每个请求处理耗时 10 ms最大等待时长 4.07 s
log.info("expectedMaxConflict: {},actualMaxConflict: {}", expectedMaxConflict, sorted.get(0).getValue());
sorted.forEach(e -> {
log.info("index: {} -> conflict: {}", e.getKey(), e.getValue());
});
}
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.samples.processors;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
/**
* @author Echo009
* @since 2022/4/27
*/
public class SimpleProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger();
String jobParams = context.getJobParams();
logger.info("Current context:{}", context.getWorkflowContext());
logger.info("Current job params:{}", jobParams);
return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true);
}
}