diff --git a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MinioOssServiceTest.java b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MinioOssServiceTest.java index 57560be8..0ed5bf4b 100644 --- a/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MinioOssServiceTest.java +++ b/powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MinioOssServiceTest.java @@ -1,18 +1,18 @@ package tech.powerjob.server.persistence.storage.impl; -import org.apache.commons.lang3.exception.ExceptionUtils; +import lombok.extern.slf4j.Slf4j; import tech.powerjob.server.extension.dfs.DFsService; import java.util.Optional; -import static org.junit.jupiter.api.Assertions.*; - /** - * desc + * MinioOssServiceTest + * 测试需要先本地部署 minio,因此捕获异常,失败也不阻断测试 * * @author tjq * @since 2024/2/26 */ +@Slf4j class MinioOssServiceTest extends AbstractDfsServiceTest { @Override @@ -22,7 +22,8 @@ class MinioOssServiceTest extends AbstractDfsServiceTest { aliOssService.initOssClient("http://192.168.124.23:9000", "pj2","testAk", "testSktestSktestSk"); return Optional.of(aliOssService); } catch (Exception e) { - ExceptionUtils.rethrow(e); + // 仅异常提醒 + log.error("[MinioOssServiceTest] test exception!", e); } return Optional.empty(); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/selector/impl/SpecifyTaskTrackerSelector.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/selector/impl/SpecifyTaskTrackerSelector.java index de2b738b..cbad5331 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/selector/impl/SpecifyTaskTrackerSelector.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/selector/impl/SpecifyTaskTrackerSelector.java @@ -1,7 +1,7 @@ package tech.powerjob.server.remote.worker.selector.impl; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import tech.powerjob.common.enums.DispatchStrategy; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java index f857e43c..0f62f0d9 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java @@ -92,6 +92,7 @@ public class OmsContainerFactory { try { if (!jarFile.exists()) { + log.info("[OmsContainer-{}] container not exist(path={}), try to download from server!", containerId, jarFile.getPath()); FileUtils.forceMkdirParent(jarFile); FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000); log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath()); @@ -107,6 +108,7 @@ public class OmsContainerFactory { if (oldContainer != null) { // 销毁旧容器 + log.info("[OmsContainer-{}] start to destroy old container(version={})", containerId, oldContainer.getVersion()); oldContainer.destroy(); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/extension/processor/ProcessorBean.java b/powerjob-worker/src/main/java/tech/powerjob/worker/extension/processor/ProcessorBean.java index c7238f33..28c31127 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/extension/processor/ProcessorBean.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/extension/processor/ProcessorBean.java @@ -25,4 +25,10 @@ public class ProcessorBean { */ private transient ClassLoader classLoader; + /** + * Bean 是否稳定 + * SpringBean / 普通Java 对象,在整个 JVM 生命周期内都不会变,可声明为稳定,在上层缓存,避免每次都要重现 build processor + * 对于动态容器,可能在部署后改变,则需要声明为不稳定 + */ + private boolean stable = true; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/PowerJobProcessorLoader.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/PowerJobProcessorLoader.java index 00be4fa5..f7e3af68 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/PowerJobProcessorLoader.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/PowerJobProcessorLoader.java @@ -30,27 +30,36 @@ public class PowerJobProcessorLoader implements ProcessorLoader { @Override public ProcessorBean load(ProcessorDefinition definition) { - return def2Bean.computeIfAbsent(definition, ignore -> { - final String processorType = definition.getProcessorType(); - log.info("[ProcessorFactory] start to load Processor: {}", definition); - for (ProcessorFactory pf : processorFactoryList) { - final String pfName = pf.getClass().getSimpleName(); - if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) { - log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType); - continue; - } - log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition); - try { - ProcessorBean processorBean = pf.build(definition); - if (processorBean != null) { - log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition); - return processorBean; - } - } catch (Throwable t) { - log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t); - } + + ProcessorBean pBean = def2Bean.computeIfAbsent(definition, ignore -> buildProcessorBean(definition)); + + if (pBean.isStable()) { + return pBean; + } + + return buildProcessorBean(definition); + } + + private ProcessorBean buildProcessorBean(ProcessorDefinition definition) { + final String processorType = definition.getProcessorType(); + log.info("[ProcessorFactory] start to load Processor: {}", definition); + for (ProcessorFactory pf : processorFactoryList) { + final String pfName = pf.getClass().getSimpleName(); + if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) { + log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType); + continue; } - throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config"); - }); + log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition); + try { + ProcessorBean processorBean = pf.build(definition); + if (processorBean != null) { + log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition); + return processorBean; + } + } catch (Throwable t) { + log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t); + } + } + throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config"); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java index 26762be5..59c4e002 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java @@ -46,7 +46,9 @@ public class JarContainerProcessorFactory implements ProcessorFactory { if (omsContainer != null) { return new ProcessorBean() .setProcessor(omsContainer.getProcessor(className)) - .setClassLoader(omsContainer.getContainerClassLoader()); + .setClassLoader(omsContainer.getContainerClassLoader()) + .setStable(false) + ; } else { log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo); }