From 503e9db5c2bacc76d14eae5f6d833462f4e28f51 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 20 Jan 2023 16:34:03 +0800 Subject: [PATCH] feat: remove spring in PowerJobWorker --- .../common/utils/CollectionUtils.java | 33 +++++++++++++ .../tech/powerjob/agent/MainApplication.java | 3 +- .../PowerJobAutoConfiguration.java | 5 +- powerjob-worker/pom.xml | 3 +- .../powerjob/worker/PowerJobSpringWorker.java | 49 +++++++++++++++++++ .../tech/powerjob/worker/PowerJobWorker.java | 33 +++---------- .../worker/actors/ProcessorTrackerActor.java | 2 +- .../background/ServerDiscoveryService.java | 6 +-- .../worker/common/utils/SpringUtils.java | 24 +-------- .../core/processor/sdk/MapProcessor.java | 2 +- .../tracker/processor/ProcessorTracker.java | 2 +- .../tracker/task/heavy/CommonTaskTracker.java | 6 +-- .../tracker/task/heavy/HeavyTaskTracker.java | 18 +++---- .../persistence/TaskPersistenceService.java | 8 +-- 14 files changed, 116 insertions(+), 78 deletions(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/utils/CollectionUtils.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/CollectionUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/CollectionUtils.java new file mode 100644 index 00000000..2ac86ef4 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/CollectionUtils.java @@ -0,0 +1,33 @@ +package tech.powerjob.common.utils; + +import java.util.Collection; +import java.util.Map; + +/** + * CollectionUtils + * + * @author tjq + * @since 2023/1/20 + */ +public class CollectionUtils { + + /** + * Return {@code true} if the supplied Collection is {@code null} or empty. + * Otherwise, return {@code false}. + * @param collection the Collection to check + * @return whether the given Collection is empty + */ + public static boolean isEmpty(Collection collection) { + return (collection == null || collection.isEmpty()); + } + + /** + * Return {@code true} if the supplied Map is {@code null} or empty. + * Otherwise, return {@code false}. + * @param map the Map to check + * @return whether the given Map is empty + */ + public static boolean isEmpty(Map map) { + return (map == null || map.isEmpty()); + } +} diff --git a/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java b/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java index b89767f1..4b84bfac 100644 --- a/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java +++ b/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java @@ -57,8 +57,7 @@ public class MainApplication implements Runnable { cfg.setMaxResultLength(length); cfg.setTag(tag); - PowerJobWorker worker = new PowerJobWorker(); - worker.setConfig(cfg); + PowerJobWorker worker = new PowerJobWorker(cfg); worker.init(); }catch (Exception e) { diff --git a/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java index a5e079c2..f1e82d99 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.worker.PowerJobSpringWorker; import tech.powerjob.worker.PowerJobWorker; import tech.powerjob.worker.common.PowerJobWorkerConfig; @@ -83,9 +84,7 @@ public class PowerJobAutoConfiguration { /* * Create OhMyWorker object and set properties. */ - PowerJobWorker ohMyWorker = new PowerJobWorker(); - ohMyWorker.setConfig(config); - return ohMyWorker; + return new PowerJobSpringWorker(config); } } diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 5f1d8422..045254d5 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -28,11 +28,12 @@ - + org.springframework spring-context ${spring.version} + provided diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java new file mode 100644 index 00000000..c089fb43 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobSpringWorker.java @@ -0,0 +1,49 @@ +package tech.powerjob.worker; + +import com.google.common.collect.Lists; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import tech.powerjob.worker.common.PowerJobWorkerConfig; +import tech.powerjob.worker.extension.processor.ProcessorFactory; +import tech.powerjob.worker.processor.impl.BuiltInSpringProcessorFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Spring 项目中的 Worker 启动器 + * 能够获取到由 Spring IOC 容器管理的 processor + * + * @author tjq + * @since 2023/1/20 + */ +public class PowerJobSpringWorker extends PowerJobWorker implements ApplicationContextAware, InitializingBean, DisposableBean { + + + public PowerJobSpringWorker(PowerJobWorkerConfig config) { + super(config); + } + + @Override + public void afterPropertiesSet() throws Exception { + init(); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext); + + // append BuiltInSpringProcessorFactory + PowerJobWorkerConfig workerConfig = workerRuntime.getWorkerConfig(); + List processorFactories = Lists.newArrayList( + Optional.ofNullable(workerConfig.getProcessorFactoryList()) + .orElse(Collections.emptyList())); + processorFactories.add(springProcessorFactory); + workerConfig.setProcessorFactoryList(processorFactories); + } + +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index f4b02697..bd50df56 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -3,11 +3,6 @@ package tech.powerjob.worker; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.serialize.JsonUtils; @@ -29,7 +24,6 @@ import tech.powerjob.worker.background.WorkerHealthReporter; import tech.powerjob.worker.common.PowerBannerPrinter; import tech.powerjob.worker.common.PowerJobWorkerConfig; import tech.powerjob.worker.common.WorkerRuntime; -import tech.powerjob.worker.common.utils.SpringUtils; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.extension.processor.ProcessorFactory; import tech.powerjob.worker.persistence.TaskPersistenceService; @@ -52,21 +46,15 @@ import java.util.concurrent.atomic.AtomicBoolean; * @since 2020/3/16 */ @Slf4j -public class PowerJobWorker implements ApplicationContextAware, InitializingBean, DisposableBean { +public class PowerJobWorker { + private final RemoteEngine remoteEngine; + protected final WorkerRuntime workerRuntime; + private final AtomicBoolean initialized = new AtomicBoolean(false); - private final WorkerRuntime workerRuntime = new WorkerRuntime(); - - private final RemoteEngine remoteEngine = new PowerJobRemoteEngine(); - private final AtomicBoolean initialized = new AtomicBoolean(); - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - SpringUtils.inject(applicationContext); - } - - @Override - public void afterPropertiesSet() throws Exception { - init(); + public PowerJobWorker(PowerJobWorkerConfig config) { + this.workerRuntime = new WorkerRuntime(); + this.remoteEngine = new PowerJobRemoteEngine(); + workerRuntime.setWorkerConfig(config); } public void init() throws Exception { @@ -152,10 +140,6 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean } } - public void setConfig(PowerJobWorkerConfig config) { - workerRuntime.setWorkerConfig(config); - } - @SuppressWarnings("rawtypes") private void assertAppName() { @@ -199,7 +183,6 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean return new PowerJobProcessorLoader(finalPF); } - @Override public void destroy() throws Exception { workerRuntime.getExecutorManager().shutdown(); remoteEngine.close(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java index 3b933386..846cfa37 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/actors/ProcessorTrackerActor.java @@ -1,8 +1,8 @@ package tech.powerjob.worker.actors; import lombok.extern.slf4j.Slf4j; -import org.springframework.util.CollectionUtils; import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.ProcessType; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java index 7e25a62f..89f07874 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java @@ -3,15 +3,15 @@ package tech.powerjob.worker.background; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.util.CollectionUtils; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.HttpUtils; import tech.powerjob.worker.common.PowerJobWorkerConfig; -import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; +import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker; import java.util.List; import java.util.Map; @@ -55,7 +55,7 @@ public class ServerDiscoveryService { public void start(ScheduledExecutorService timingPool) { this.currentServerAddress = discovery(); - if (org.springframework.util.StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) { + if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) { throw new PowerJobException("can't find any available server, this worker has been quarantined."); } // 这里必须保证成功 diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SpringUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SpringUtils.java index 59ad2473..156e7ffb 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SpringUtils.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SpringUtils.java @@ -12,28 +12,6 @@ import org.springframework.context.ApplicationContext; @Slf4j public class SpringUtils { - private static boolean supportSpringBean = false; - - private static ApplicationContext context; - - public static void inject(ApplicationContext ctx) { - context = ctx; - supportSpringBean = true; - } - - public static boolean supportSpringBean() { - return supportSpringBean; - } - - public static T getBean(Class clz) { - return context.getBean(clz); - } - - - public static T getBean(String className) throws Exception { - return getBean(className, context); - } - @SuppressWarnings("unchecked") public static T getBean(String className, ApplicationContext ctx) throws Exception { // 1. ClassLoader 存在,则直接使用 clz 加载 @@ -48,7 +26,7 @@ public class SpringUtils { char[] cs = beanName.toCharArray(); cs[0] += 32; String beanName0 = String.valueOf(cs); - log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", context, beanName0); + log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0); return (T) ctx.getBean(beanName0); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java index db2dc6ab..1ec00177 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -2,8 +2,8 @@ package tech.powerjob.worker.core.processor.sdk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.util.CollectionUtils; import tech.powerjob.common.exception.PowerJobCheckedException; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 51d84715..7bc79f68 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -4,10 +4,10 @@ import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.springframework.util.CollectionUtils; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 126d8204..8853f310 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -4,7 +4,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.springframework.util.CollectionUtils; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.SystemInstanceResult; @@ -14,17 +13,15 @@ import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; -import tech.powerjob.common.response.AskResponse; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.persistence.TaskDO; -import java.time.Duration; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -176,7 +173,6 @@ public class CommonTaskTracker extends HeavyTaskTracker { finished.set(true); List allTask = taskPersistenceService.getAllTask(instanceId, instanceId); if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { - success = false; result = SystemInstanceResult.UNKNOWN_BUG; log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); } else { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 86e93126..4f75e0ab 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -1,17 +1,23 @@ package tech.powerjob.worker.core.tracker.task.heavy; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Stopwatch; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.util.CollectionUtils; -import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.WorkerQueryExecutorClusterReq; import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CollectionUtils; +import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; @@ -26,12 +32,6 @@ import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; -import com.google.common.base.Stopwatch; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; import java.util.List; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java index 97c5794d..f490146c 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java @@ -1,17 +1,17 @@ package tech.powerjob.worker.persistence; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SupplierPlus; import tech.powerjob.worker.common.constants.StoreStrategy; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import lombok.extern.slf4j.Slf4j; -import org.springframework.util.CollectionUtils; import java.util.Collections; import java.util.List;