feat: add segment lock to fix the schedule concurrency bug #168

This commit is contained in:
tjq 2021-01-16 16:01:40 +08:00
parent 895e69f043
commit 1c52809ebb
4 changed files with 128 additions and 0 deletions

View File

@ -0,0 +1,63 @@
package com.github.kfcfans.powerjob.server.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.lang.reflect.Method;
/**
* AOP Utils
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
public class AOPUtils {
private static final ExpressionParser parser = new SpelExpressionParser();
private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
public static Method parseMethod(ProceedingJoinPoint joinPoint) {
Signature pointSignature = joinPoint.getSignature();
if (!(pointSignature instanceof MethodSignature)) {
throw new IllegalArgumentException("this annotation should be used on a method!");
}
MethodSignature signature = (MethodSignature) pointSignature;
Method method = signature.getMethod();
if (method.getDeclaringClass().isInterface()) {
try {
method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes());
} catch (SecurityException | NoSuchMethodException e) {
ExceptionUtils.rethrow(e);
}
}
return method;
}
public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {
String[] params = discoverer.getParameterNames(method);
assert params != null;
EvaluationContext context = new StandardEvaluationContext();
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], arguments[len]);
}
try {
Expression expression = parser.parseExpression(spEl);
return expression.getValue(context, clazz);
} catch (Exception e) {
log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);
return defaultResult;
}
}
}

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService;
import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -46,6 +47,7 @@ public class DispatchService {
private static final Splitter commaSplitter = Splitter.on(",");
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024)
public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId());
@ -59,6 +61,7 @@ public class DispatchService {
* @param instanceParams 实例的运行参数API触发方式专用
* @param wfInstanceId 工作流任务实例IDworkflow 任务专用
*/
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) {
Long jobId = jobInfo.getId();
log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams);

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.powerjob.server.service.lock.local;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* use segment lock to make concurrent safe
*
* @author tjq
* @since 1/16/21
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseSegmentLock {
String type();
String key();
int concurrencyLevel();
}

View File

@ -0,0 +1,39 @@
package com.github.kfcfans.powerjob.server.service.lock.local;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.common.utils.AOPUtils;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Description
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
@Aspect
@Component
public class UseSegmentLockAspect {
private final Map<String, SegmentLock> lockStore = Maps.newConcurrentMap();
@Around(value = "@annotation(useSegmentLock))")
public Object execute(ProceedingJoinPoint point, UseSegmentLock useSegmentLock) throws Throwable {
SegmentLock segmentLock = lockStore.computeIfAbsent(useSegmentLock.type(), ignore -> {
int concurrencyLevel = useSegmentLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create SegmentLock for [{}] with concurrencyLevel: {}", useSegmentLock.type(), concurrencyLevel);
return new SegmentLock(concurrencyLevel);
});
int index = AOPUtils.parseSpEl(AOPUtils.parseMethod(point), point.getArgs(), useSegmentLock.key(), Integer.class, 1);
segmentLock.lockInterruptibleSafe(index);
return point.proceed();
}
}