From cc7a63c69f3bd08d213f07070abaf1b6f9b63257 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 17 Jan 2023 22:53:32 +0800 Subject: [PATCH] feat: JarContainerProcessorFactory --- .../impl/JarContainerProcessorFactory.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java new file mode 100644 index 00000000..9d815dbc --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java @@ -0,0 +1,60 @@ +package tech.powerjob.worker.processor.impl; + +import akka.actor.ActorSelection; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.enums.ProcessorType; +import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.common.utils.AkkaUtils; +import tech.powerjob.worker.container.OmsContainer; +import tech.powerjob.worker.container.OmsContainerFactory; +import tech.powerjob.worker.extension.processor.ProcessorBean; +import tech.powerjob.worker.extension.processor.ProcessorDefinition; +import tech.powerjob.worker.extension.processor.ProcessorFactory; + +import java.util.Set; + +/** + * 加载容器处理器 + * + * @author tjq + * @since 2023/1/17 + */ +@Slf4j +public class JarContainerProcessorFactory implements ProcessorFactory { + + private final WorkerRuntime workerRuntime; + + public JarContainerProcessorFactory(WorkerRuntime workerRuntime) { + this.workerRuntime = workerRuntime; + } + + @Override + public Set supportTypes() { + return Sets.newHashSet(ProcessorType.EXTERNAL.name()); + } + + @Override + public ProcessorBean build(ProcessorDefinition processorDefinition) { + log.info("[ProcessorFactory] use 'JarContainerProcessorFactory' to load, processorDefinition is: {}", processorDefinition); + + String processorInfo = processorDefinition.getProcessorInfo(); + String[] split = processorInfo.split("#"); + String containerName = split[0]; + String className = split[1]; + + log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName); + + String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); + ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath); + OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), actorSelection); + if (omsContainer != null) { + return new ProcessorBean() + .setProcessor(omsContainer.getProcessor(className)) + .setClassLoader(omsContainer.getContainerClassLoader()); + } else { + log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo); + } + return null; + } +}