diff --git a/pom.xml b/pom.xml index db1df523..f946cf27 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ tech.powerjob powerjob - 4.3.2 + 4.3.3 pom powerjob http://www.powerjob.tech diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 1259b8d9..dfacf993 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -5,18 +5,18 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-client - 4.3.2 + 4.3.3 jar 5.9.1 1.2.83 - 4.3.2 + 4.3.3 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 694e2bda..179357d0 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-common - 4.3.2 + 4.3.3 jar diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml index 5511b4d2..c4cc43a7 100644 --- a/powerjob-official-processors/pom.xml +++ b/powerjob-official-processors/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-official-processors - 4.3.2 + 4.3.3 jar @@ -20,7 +20,7 @@ 5.9.1 1.2.9 - 4.3.2 + 4.3.3 5.2.9.RELEASE 2.1.214 8.0.28 diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml index b9b330ee..e15ded3d 100644 --- a/powerjob-remote/pom.xml +++ b/powerjob-remote/pom.xml @@ -5,7 +5,7 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 pom diff --git a/powerjob-remote/powerjob-remote-benchmark/pom.xml b/powerjob-remote/powerjob-remote-benchmark/pom.xml index 0564f87c..de40102e 100644 --- a/powerjob-remote/powerjob-remote-benchmark/pom.xml +++ b/powerjob-remote/powerjob-remote-benchmark/pom.xml @@ -5,7 +5,7 @@ powerjob-remote tech.powerjob - 4.3.2 + 4.3.3 4.0.0 @@ -21,8 +21,8 @@ 1.2.9 2.7.4 - 4.3.2 - 4.3.2 + 4.3.3 + 4.3.3 3.9.0 4.2.9 diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml index 5f2441a4..3735d44a 100644 --- a/powerjob-remote/powerjob-remote-framework/pom.xml +++ b/powerjob-remote/powerjob-remote-framework/pom.xml @@ -5,11 +5,11 @@ powerjob-remote tech.powerjob - 4.3.2 + 4.3.3 4.0.0 - 4.3.2 + 4.3.3 powerjob-remote-framework @@ -17,7 +17,7 @@ 8 UTF-8 - 4.3.2 + 4.3.3 0.10.2 diff --git a/powerjob-remote/powerjob-remote-impl-akka/pom.xml b/powerjob-remote/powerjob-remote-impl-akka/pom.xml index 15f58d31..20248943 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/pom.xml +++ b/powerjob-remote/powerjob-remote-impl-akka/pom.xml @@ -5,19 +5,19 @@ powerjob-remote tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-remote-impl-akka - 4.3.2 + 4.3.3 8 8 UTF-8 - 4.3.2 + 4.3.3 2.6.13 diff --git a/powerjob-remote/powerjob-remote-impl-http/pom.xml b/powerjob-remote/powerjob-remote-impl-http/pom.xml index 557b3b00..f0f2dcee 100644 --- a/powerjob-remote/powerjob-remote-impl-http/pom.xml +++ b/powerjob-remote/powerjob-remote-impl-http/pom.xml @@ -5,12 +5,12 @@ powerjob-remote tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-remote-impl-http - 4.3.2 + 4.3.3 8 @@ -18,7 +18,7 @@ UTF-8 4.3.7 - 4.3.2 + 4.3.3 diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 6e4bc90b..df3bd5fe 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-server - 4.3.2 + 4.3.3 pom @@ -49,9 +49,9 @@ 3.0.10 9.1.6 - 4.3.2 - 4.3.2 - 4.3.2 + 4.3.3 + 4.3.3 + 4.3.3 1.6.14 diff --git a/powerjob-server/powerjob-server-common/pom.xml b/powerjob-server/powerjob-server-common/pom.xml index e38fcd53..bbff066f 100644 --- a/powerjob-server/powerjob-server-common/pom.xml +++ b/powerjob-server/powerjob-server-common/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-core/pom.xml b/powerjob-server/powerjob-server-core/pom.xml index 60beb006..3473f27a 100644 --- a/powerjob-server/powerjob-server-core/pom.xml +++ b/powerjob-server/powerjob-server-core/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java index ee2b517d..07f02e94 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java @@ -10,6 +10,7 @@ import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder; @@ -81,6 +82,14 @@ public class InstanceManager implements TransportServiceAware { log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId); return; } + + // 考虑极端情况:Processor 处理耗时小于 server 写 DB 耗时,会导致状态上报时无 taskTracker 地址,此处等待后重新从DB获取数据 GitHub#620 + if (StringUtils.isEmpty(instanceInfo.getTaskTrackerAddress())) { + log.warn("[InstanceManager-{}] TaskTrackerAddress is empty, server will wait then acquire again!", instanceId); + CommonUtils.easySleep(277); + instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); + } + int originStatus = instanceInfo.getStatus(); // 丢弃过期的上报数据 if (req.getReportTime() <= instanceInfo.getLastReportTime()) { diff --git a/powerjob-server/powerjob-server-extension/pom.xml b/powerjob-server/powerjob-server-extension/pom.xml index 0dc14617..aa65f78d 100644 --- a/powerjob-server/powerjob-server-extension/pom.xml +++ b/powerjob-server/powerjob-server-extension/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-migrate/pom.xml b/powerjob-server/powerjob-server-migrate/pom.xml index 2d7fd98d..6d245066 100644 --- a/powerjob-server/powerjob-server-migrate/pom.xml +++ b/powerjob-server/powerjob-server-migrate/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-monitor/pom.xml b/powerjob-server/powerjob-server-monitor/pom.xml index 2a576025..6158c254 100644 --- a/powerjob-server/powerjob-server-monitor/pom.xml +++ b/powerjob-server/powerjob-server-monitor/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index 3e157df6..c5494ae9 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-remote/pom.xml b/powerjob-server/powerjob-server-remote/pom.xml index a587fe6b..e5040c98 100644 --- a/powerjob-server/powerjob-server-remote/pom.xml +++ b/powerjob-server/powerjob-server-remote/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml index b53f59a1..87a35de8 100644 --- a/powerjob-server/powerjob-server-starter/pom.xml +++ b/powerjob-server/powerjob-server-starter/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.2 + 4.3.3 ../pom.xml 4.0.0 @@ -58,6 +58,8 @@ ${springboot.version} tech.powerjob.server.PowerJobServerApplication + + exec diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 451f7cb0..d125c0bb 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -5,24 +5,24 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-worker-agent - 4.3.2 + 4.3.3 jar - 4.3.2 + 4.3.3 1.2.9 4.3.2 5.3.23 2.3.4.RELEASE - 4.3.2 + 4.3.3 8.0.28 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 83348b52..cc616a8f 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -5,18 +5,18 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-worker-samples - 4.3.2 + 4.3.3 2.7.4 - 4.3.2 + 4.3.3 1.2.83 - 4.3.2 + 4.3.3 true diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/ZeroCostTestProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/ZeroCostTestProcessor.java new file mode 100644 index 00000000..4f4fdf4a --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/ZeroCostTestProcessor.java @@ -0,0 +1,18 @@ +package tech.powerjob.samples.processors.test; + +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +/** + * ZeroCostTestProcessor + * + * @author tjq + * @since 2023/5/7 + */ +public class ZeroCostTestProcessor implements BasicProcessor { + @Override + public ProcessResult process(TaskContext context) throws Exception { + return new ProcessResult(true, "zero cost"); + } +} diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java new file mode 100644 index 00000000..ce0777cd --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java @@ -0,0 +1,36 @@ +package tech.powerjob.samples.tester; + +import org.springframework.stereotype.Component; +import tech.powerjob.worker.annotation.PowerJobHandler; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.log.OmsLogger; + +@Component(value = "springMethodProcessorService") +public class SpringMethodProcessorService { + + /** + * 处理器配置方法1: 全限定类名#方法名,比如 tech.powerjob.samples.tester.SpringMethodProcessorService#testEmptyReturn + * 处理器配置方法2: SpringBean名称#方法名,比如 springMethodProcessorService#testEmptyReturn + * @param context 必须要有入参 TaskContext,返回值可以是 null,也可以是其他任意类型。正常返回代表成功,抛出异常代表执行失败 + */ + @PowerJobHandler(name = "testEmptyReturn") + public void testEmptyReturn(TaskContext context) { + OmsLogger omsLogger = context.getOmsLogger(); + omsLogger.warn("测试日志"); + } + + + @PowerJobHandler(name = "testNormalReturn") + public String testNormalReturn(TaskContext context) { + OmsLogger omsLogger = context.getOmsLogger(); + omsLogger.warn("测试日志"); + return "testNormalReturn"; + } + + @PowerJobHandler(name = "testThrowException") + public String testThrowException(TaskContext context) { + OmsLogger omsLogger = context.getOmsLogger(); + omsLogger.warn("testThrowException"); + throw new IllegalArgumentException("test"); + } +} diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 14083922..50c6314c 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -5,16 +5,16 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-worker-spring-boot-starter - 4.3.2 + 4.3.3 jar - 4.3.2 + 4.3.3 2.7.4 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 4ba4736f..312ac0b7 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.2 + 4.3.3 4.0.0 powerjob-worker - 4.3.2 + 4.3.3 jar @@ -21,10 +21,10 @@ 1.2.9 - 4.3.2 - 4.3.2 - 4.3.2 - 4.3.2 + 4.3.3 + 4.3.3 + 4.3.3 + 4.3.3 diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java index 0bc21136..d6fb62f6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java @@ -8,6 +8,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import tech.powerjob.worker.common.PowerJobWorkerConfig; import tech.powerjob.worker.extension.processor.ProcessorFactory; +import tech.powerjob.worker.processor.impl.BuildInSpringMethodProcessorFactory; import tech.powerjob.worker.processor.impl.BuiltInSpringProcessorFactory; import java.util.Collections; @@ -43,12 +44,14 @@ public class PowerJobSpringWorker implements ApplicationContextAware, Initializi public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext); + BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext); // append BuiltInSpringProcessorFactory List processorFactories = Lists.newArrayList( Optional.ofNullable(config.getProcessorFactoryList()) .orElse(Collections.emptyList())); processorFactories.add(springProcessorFactory); + processorFactories.add(springMethodProcessorFactory); config.setProcessorFactoryList(processorFactories); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/annotation/PowerJobHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/annotation/PowerJobHandler.java new file mode 100644 index 00000000..b0324df5 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/annotation/PowerJobHandler.java @@ -0,0 +1,26 @@ +package tech.powerjob.worker.annotation; + + +import java.lang.annotation.*; + +/** + * 方法级别的power-job调度 + * PR#610 + * + * @author vannewang + * @since 2023/4/6 + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface PowerJobHandler { + + + /** + * handler name + */ + String name(); + + + +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java index dfbe2cd0..bd502361 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/ConnectionFactory.java @@ -57,7 +57,9 @@ public class ConnectionFactory { // JVM 关闭时删除数据库文件 try { FileUtils.forceDeleteOnExit(new File(H2_PATH)); - }catch (Exception ignore) { + log.info("[PowerDatasource] delete worker db file[{}] on JVM exit successfully", H2_PATH); + }catch (Throwable t) { + log.warn("[PowerDatasource] delete file on JVM exit failed: {}", H2_PATH, t); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/AbstractBuildInSpringProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/AbstractBuildInSpringProcessorFactory.java new file mode 100644 index 00000000..d4d5a16d --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/AbstractBuildInSpringProcessorFactory.java @@ -0,0 +1,65 @@ +package tech.powerjob.worker.processor.impl; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; +import tech.powerjob.common.enums.ProcessorType; +import tech.powerjob.worker.extension.processor.ProcessorFactory; + +import java.util.Set; + +@Slf4j +public abstract class AbstractBuildInSpringProcessorFactory implements ProcessorFactory { + + protected final ApplicationContext applicationContext; + + protected AbstractBuildInSpringProcessorFactory(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + @Override + public Set supportTypes() { + return Sets.newHashSet(ProcessorType.BUILT_IN.name()); + } + + protected boolean checkCanLoad() { + try { + ApplicationContext.class.getClassLoader(); + return applicationContext != null; + } catch (Throwable ignore) { + } + return false; + } + + + @SuppressWarnings("unchecked") + protected static T getBean(String className, ApplicationContext ctx) throws Exception { + + // 0. 尝试直接用 Bean 名称加载 + try { + final Object bean = ctx.getBean(className); + if (bean != null) { + return (T) bean; + } + } catch (Exception ignore) { + } + + // 1. ClassLoader 存在,则直接使用 clz 加载 + ClassLoader classLoader = ctx.getClassLoader(); + if (classLoader != null) { + return (T) ctx.getBean(classLoader.loadClass(className)); + } + // 2. ClassLoader 不存在(系统类加载器不可见),尝试用类名称小写加载 + String[] split = className.split("\\."); + String beanName = split[split.length - 1]; + // 小写转大写 + char[] cs = beanName.toCharArray(); + cs[0] += 32; + String beanName0 = String.valueOf(cs); + log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0); + return (T) ctx.getBean(beanName0); + } + + + +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java new file mode 100644 index 00000000..00021bb6 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java @@ -0,0 +1,95 @@ +package tech.powerjob.worker.processor.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.context.ApplicationContext; +import tech.powerjob.worker.annotation.PowerJobHandler; +import tech.powerjob.worker.extension.processor.ProcessorBean; +import tech.powerjob.worker.extension.processor.ProcessorDefinition; + +import java.lang.reflect.Method; +import java.util.LinkedList; +import java.util.List; + +/** + * 内建的 SpringBean 处理器工厂,用于加载 Spring 管理Bean下的方法(使用PowerJob注解),非核心依赖 + * + * @author wxp + * @since 2023/4/06 + */ + +@Slf4j +public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringProcessorFactory { + + private static final List jobHandlerRepository = new LinkedList<>(); + + private final static String DELIMITER = "#"; + + + public BuildInSpringMethodProcessorFactory(ApplicationContext applicationContext) { + super(applicationContext); + } + + + @Override + public ProcessorBean build(ProcessorDefinition processorDefinition) { + try { + boolean canLoad = checkCanLoad(); + if (!canLoad) { + log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuildInSpringMethodProcessorFactory'"); + return null; + } + String processorInfo = processorDefinition.getProcessorInfo(); + if (!processorInfo.contains(DELIMITER)) { + log.info("[ProcessorFactory] can't parse processorDefinition, this processor can't load by 'BuildInSpringMethodProcessorFactory'"); + return null; + } + String[] split = processorInfo.split(DELIMITER); + String methodName = split[1]; + String className = split[0]; + Object bean = getBean(className,applicationContext); + Method[] methods = bean.getClass().getDeclaredMethods(); + for (Method method : methods) { + PowerJobHandler powerJob = method.getAnnotation(PowerJobHandler.class); + if (powerJob == null) { + continue; + } + String name = powerJob.name(); + //匹配到和页面定义相同的methodName + if (!name.equals(methodName)) { + continue; + } + if (name.trim().length() == 0) { + throw new RuntimeException("method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); + } + if (containsJobHandler(name)) { + throw new RuntimeException("jobhandler[" + name + "] naming conflicts."); + } + method.setAccessible(true); + registerJobHandler(methodName); + + MethodBasicProcessor processor = new MethodBasicProcessor(bean, method); + return new ProcessorBean() + .setProcessor(processor) + .setClassLoader(processor.getClass().getClassLoader()); + } + } catch (NoSuchBeanDefinitionException ignore) { + log.warn("[ProcessorFactory] can't find the processor in SPRING"); + } catch (Throwable t) { + log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t); + } + return null; + } + + + public static void registerJobHandler(String name) { + jobHandlerRepository.add(name); + } + + + private boolean containsJobHandler(String name) { + return jobHandlerRepository.contains(name); + } + + +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java index 3ce7481f..b72c540b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java @@ -19,17 +19,11 @@ import java.util.Set; * @since 2023/1/17 */ @Slf4j -public class BuiltInSpringProcessorFactory implements ProcessorFactory { +public class BuiltInSpringProcessorFactory extends AbstractBuildInSpringProcessorFactory { - private final ApplicationContext applicationContext; public BuiltInSpringProcessorFactory(ApplicationContext applicationContext) { - this.applicationContext = applicationContext; - } - - @Override - public Set supportTypes() { - return Sets.newHashSet(ProcessorType.BUILT_IN.name()); + super(applicationContext); } @Override @@ -41,8 +35,12 @@ public class BuiltInSpringProcessorFactory implements ProcessorFactory { log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuiltInSpringProcessorFactory'"); return null; } - - BasicProcessor basicProcessor = getBean(processorDefinition.getProcessorInfo(), applicationContext); + String processorInfo = processorDefinition.getProcessorInfo(); + //用于区分方法级别的参数 + if (processorInfo.contains("#")) { + return null; + } + BasicProcessor basicProcessor = getBean(processorInfo, applicationContext); return new ProcessorBean() .setProcessor(basicProcessor) .setClassLoader(basicProcessor.getClass().getClassLoader()); @@ -55,42 +53,5 @@ public class BuiltInSpringProcessorFactory implements ProcessorFactory { return null; } - private boolean checkCanLoad() { - try { - ApplicationContext.class.getClassLoader(); - return applicationContext != null; - } catch (Throwable ignore) { - } - return false; - } - - - @SuppressWarnings("unchecked") - private static T getBean(String className, ApplicationContext ctx) throws Exception { - - // 0. 尝试直接用 Bean 名称加载 - try { - final Object bean = ctx.getBean(className); - if (bean != null) { - return (T) bean; - } - } catch (Exception ignore) { - } - - // 1. ClassLoader 存在,则直接使用 clz 加载 - ClassLoader classLoader = ctx.getClassLoader(); - if (classLoader != null) { - return (T) ctx.getBean(classLoader.loadClass(className)); - } - // 2. ClassLoader 不存在(系统类加载器不可见),尝试用类名称小写加载 - String[] split = className.split("\\."); - String beanName = split[split.length - 1]; - // 小写转大写 - char[] cs = beanName.toCharArray(); - cs[0] += 32; - String beanName0 = String.valueOf(cs); - log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0); - return (T) ctx.getBean(beanName0); - } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java new file mode 100644 index 00000000..b148591f --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java @@ -0,0 +1,34 @@ +package tech.powerjob.worker.processor.impl; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +class MethodBasicProcessor implements BasicProcessor { + + private final Object bean; + + private final Method method; + + public MethodBasicProcessor(Object bean, Method method) { + this.bean = bean; + this.method = method; + } + + @Override + public ProcessResult process(TaskContext context) throws Exception { + try { + Object result = method.invoke(bean, context); + return new ProcessResult(true, JsonUtils.toJSONString(result)); + } catch (InvocationTargetException ite) { + ExceptionUtils.rethrow(ite.getTargetException()); + } + + return new ProcessResult(false, "IMPOSSIBLE"); + } +}