Merge branch 'pr-vannewang-master' into 4.3.3

This commit is contained in:
tjq 2023-05-07 22:29:36 +08:00
commit 8ecc5768c7
7 changed files with 245 additions and 47 deletions

View File

@ -0,0 +1,25 @@
package tech.powerjob.samples.tester;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.annotation.PowerJob;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.log.OmsLogger;
@Component
public class SpringMethodProcessorService {
@PowerJob("test")
public String test(TaskContext context) {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.warn("测试日志");
return null;
}
@PowerJob("test1")
public String test1(TaskContext context) {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.warn("测试日志");
return "测试日志";
}
}

View File

@ -8,6 +8,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import tech.powerjob.worker.processor.impl.BuildInSpringMethodProcessorFactory;
import tech.powerjob.worker.processor.impl.BuiltInSpringProcessorFactory;
import java.util.Collections;
@ -43,12 +44,14 @@ public class PowerJobSpringWorker implements ApplicationContextAware, Initializi
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);
BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext);
// append BuiltInSpringProcessorFactory
List<ProcessorFactory> processorFactories = Lists.newArrayList(
Optional.ofNullable(config.getProcessorFactoryList())
.orElse(Collections.emptyList()));
processorFactories.add(springProcessorFactory);
processorFactories.add(springMethodProcessorFactory);
config.setProcessorFactoryList(processorFactories);
}

View File

@ -0,0 +1,22 @@
package tech.powerjob.worker.annotation;
import java.lang.annotation.*;
/**
* 方法级别的power-job调度
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface PowerJob {
/**
* handler name
*/
String value();
}

View File

@ -0,0 +1,26 @@
package tech.powerjob.worker.processor;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import java.lang.reflect.Method;
public class MethodBasicProcessor implements BasicProcessor {
private final Object bean;
private final Method method;
public MethodBasicProcessor(Object bean, Method method) {
this.bean = bean;
this.method = method;
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
Object result = method.invoke(bean, context);
return new ProcessResult(true, JsonUtils.toJSONString(result));
}
}

View File

@ -0,0 +1,65 @@
package tech.powerjob.worker.processor.impl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import java.util.Set;
@Slf4j
public abstract class AbstractBuildInSpringProcessorFactory implements ProcessorFactory {
protected final ApplicationContext applicationContext;
protected AbstractBuildInSpringProcessorFactory(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Set<String> supportTypes() {
return Sets.newHashSet(ProcessorType.BUILT_IN.name());
}
protected boolean checkCanLoad() {
try {
ApplicationContext.class.getClassLoader();
return applicationContext != null;
} catch (Throwable ignore) {
}
return false;
}
@SuppressWarnings("unchecked")
protected static <T> T getBean(String className, ApplicationContext ctx) throws Exception {
// 0. 尝试直接用 Bean 名称加载
try {
final Object bean = ctx.getBean(className);
if (bean != null) {
return (T) bean;
}
} catch (Exception ignore) {
}
// 1. ClassLoader 存在则直接使用 clz 加载
ClassLoader classLoader = ctx.getClassLoader();
if (classLoader != null) {
return (T) ctx.getBean(classLoader.loadClass(className));
}
// 2. ClassLoader 不存在(系统类加载器不可见)尝试用类名称小写加载
String[] split = className.split("\\.");
String beanName = split[split.length - 1];
// 小写转大写
char[] cs = beanName.toCharArray();
cs[0] += 32;
String beanName0 = String.valueOf(cs);
log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0);
return (T) ctx.getBean(beanName0);
}
}

View File

@ -0,0 +1,96 @@
package tech.powerjob.worker.processor.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.ApplicationContext;
import tech.powerjob.worker.annotation.PowerJob;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
import tech.powerjob.worker.processor.MethodBasicProcessor;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/**
* 内建的 SpringBean 处理器工厂用于加载 Spring 管理Bean下的方法使用PowerJob注解非核心依赖
*
* @author wxp
* @since 2023/4/06
*/
@Slf4j
public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringProcessorFactory {
private static final List<String> jobHandlerRepository = new LinkedList<>();
private final static String DELIMITER = "#";
public BuildInSpringMethodProcessorFactory(ApplicationContext applicationContext) {
super(applicationContext);
}
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
try {
boolean canLoad = checkCanLoad();
if (!canLoad) {
log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuildInSpringMethodProcessorFactory'");
return null;
}
String processorInfo = processorDefinition.getProcessorInfo();
if (!processorInfo.contains(DELIMITER)) {
log.info("[ProcessorFactory] can't parse processorDefinition, this processor can't load by 'BuildInSpringMethodProcessorFactory'");
return null;
}
String[] split = processorInfo.split("#");
String methodName = split[1];
String className = split[0];
Object bean = getBean(className,applicationContext);
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
PowerJob powerJob = method.getAnnotation(PowerJob.class);
if (powerJob == null) {
continue;
}
String name = powerJob.value();
//匹配到和页面定义相同的methodName
if (!name.equals(methodName)) {
continue;
}
if (name.trim().length() == 0) {
throw new RuntimeException("method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
if (containsJobHandler(name)) {
throw new RuntimeException("jobhandler[" + name + "] naming conflicts.");
}
method.setAccessible(true);
registerJobHandler(methodName);
MethodBasicProcessor processor = new MethodBasicProcessor(bean, method);
return new ProcessorBean()
.setProcessor(processor)
.setClassLoader(processor.getClass().getClassLoader());
}
} catch (NoSuchBeanDefinitionException ignore) {
log.warn("[ProcessorFactory] can't find the processor in SPRING");
} catch (Throwable t) {
log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t);
}
return null;
}
public static void registerJobHandler(String name) {
jobHandlerRepository.add(name);
}
private boolean containsJobHandler(String name) {
return jobHandlerRepository.contains(name);
}
}

View File

@ -19,17 +19,11 @@ import java.util.Set;
* @since 2023/1/17
*/
@Slf4j
public class BuiltInSpringProcessorFactory implements ProcessorFactory {
public class BuiltInSpringProcessorFactory extends AbstractBuildInSpringProcessorFactory {
private final ApplicationContext applicationContext;
public BuiltInSpringProcessorFactory(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Set<String> supportTypes() {
return Sets.newHashSet(ProcessorType.BUILT_IN.name());
super(applicationContext);
}
@Override
@ -41,8 +35,12 @@ public class BuiltInSpringProcessorFactory implements ProcessorFactory {
log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuiltInSpringProcessorFactory'");
return null;
}
BasicProcessor basicProcessor = getBean(processorDefinition.getProcessorInfo(), applicationContext);
String processorInfo = processorDefinition.getProcessorInfo();
//用于区分方法级别的参数
if (processorInfo.contains("#")) {
return null;
}
BasicProcessor basicProcessor = getBean(processorInfo, applicationContext);
return new ProcessorBean()
.setProcessor(basicProcessor)
.setClassLoader(basicProcessor.getClass().getClassLoader());
@ -55,42 +53,5 @@ public class BuiltInSpringProcessorFactory implements ProcessorFactory {
return null;
}
private boolean checkCanLoad() {
try {
ApplicationContext.class.getClassLoader();
return applicationContext != null;
} catch (Throwable ignore) {
}
return false;
}
@SuppressWarnings("unchecked")
private static <T> T getBean(String className, ApplicationContext ctx) throws Exception {
// 0. 尝试直接用 Bean 名称加载
try {
final Object bean = ctx.getBean(className);
if (bean != null) {
return (T) bean;
}
} catch (Exception ignore) {
}
// 1. ClassLoader 存在则直接使用 clz 加载
ClassLoader classLoader = ctx.getClassLoader();
if (classLoader != null) {
return (T) ctx.getBean(classLoader.loadClass(className));
}
// 2. ClassLoader 不存在(系统类加载器不可见)尝试用类名称小写加载
String[] split = className.split("\\.");
String beanName = split[split.length - 1];
// 小写转大写
char[] cs = beanName.toCharArray();
cs[0] += 32;
String beanName0 = String.valueOf(cs);
log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0);
return (T) ctx.getBean(beanName0);
}
}