diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/AOPUtils.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/AOPUtils.java new file mode 100644 index 00000000..2f0b5067 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/AOPUtils.java @@ -0,0 +1,63 @@ +package com.github.kfcfans.powerjob.server.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.Signature; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.core.LocalVariableTableParameterNameDiscoverer; +import org.springframework.core.ParameterNameDiscoverer; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +import java.lang.reflect.Method; + +/** + * AOP Utils + * + * @author tjq + * @since 1/16/21 + */ +@Slf4j +public class AOPUtils { + + private static final ExpressionParser parser = new SpelExpressionParser(); + private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); + + public static Method parseMethod(ProceedingJoinPoint joinPoint) { + Signature pointSignature = joinPoint.getSignature(); + if (!(pointSignature instanceof MethodSignature)) { + throw new IllegalArgumentException("this annotation should be used on a method!"); + } + MethodSignature signature = (MethodSignature) pointSignature; + Method method = signature.getMethod(); + if (method.getDeclaringClass().isInterface()) { + try { + method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes()); + } catch (SecurityException | NoSuchMethodException e) { + ExceptionUtils.rethrow(e); + } + } + return method; + } + + public static T parseSpEl(Method method, Object[] arguments, String spEl, Class clazz, T defaultResult) { + String[] params = discoverer.getParameterNames(method); + assert params != null; + + EvaluationContext context = new StandardEvaluationContext(); + for (int len = 0; len < params.length; len++) { + context.setVariable(params[len], arguments[len]); + } + try { + Expression expression = parser.parseExpression(spEl); + return expression.getValue(context, clazz); + } catch (Exception e) { + log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e); + return defaultResult; + } + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 6399a72f..99733512 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService; +import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -46,6 +47,7 @@ public class DispatchService { private static final Splitter commaSplitter = Splitter.on(","); + @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId()); @@ -59,6 +61,7 @@ public class DispatchService { * @param instanceParams 实例的运行参数,API触发方式专用 * @param wfInstanceId 工作流任务实例ID,workflow 任务专用 */ + @UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024) public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) { Long jobId = jobInfo.getId(); log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLock.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLock.java new file mode 100644 index 00000000..d550d476 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLock.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.powerjob.server.service.lock.local; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * use segment lock to make concurrent safe + * + * @author tjq + * @since 1/16/21 + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface UseSegmentLock { + + String type(); + + String key(); + + int concurrencyLevel(); +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLockAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLockAspect.java new file mode 100644 index 00000000..24215406 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/local/UseSegmentLockAspect.java @@ -0,0 +1,39 @@ +package com.github.kfcfans.powerjob.server.service.lock.local; + +import com.github.kfcfans.powerjob.common.utils.SegmentLock; +import com.github.kfcfans.powerjob.server.common.utils.AOPUtils; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * Description + * + * @author tjq + * @since 1/16/21 + */ +@Slf4j +@Aspect +@Component +public class UseSegmentLockAspect { + + private final Map lockStore = Maps.newConcurrentMap(); + + @Around(value = "@annotation(useSegmentLock))") + public Object execute(ProceedingJoinPoint point, UseSegmentLock useSegmentLock) throws Throwable { + SegmentLock segmentLock = lockStore.computeIfAbsent(useSegmentLock.type(), ignore -> { + int concurrencyLevel = useSegmentLock.concurrencyLevel(); + log.info("[UseSegmentLockAspect] create SegmentLock for [{}] with concurrencyLevel: {}", useSegmentLock.type(), concurrencyLevel); + return new SegmentLock(concurrencyLevel); + }); + + int index = AOPUtils.parseSpEl(AOPUtils.parseMethod(point), point.getArgs(), useSegmentLock.key(), Integer.class, 1); + segmentLock.lockInterruptibleSafe(index); + return point.proceed(); + } +}