feat: remove spring in PowerJobWorker

This commit is contained in:
tjq 2023-01-20 16:34:03 +08:00
parent f6a6914f91
commit 503e9db5c2
14 changed files with 116 additions and 78 deletions

View File

@ -0,0 +1,33 @@
package tech.powerjob.common.utils;
import java.util.Collection;
import java.util.Map;
/**
* CollectionUtils
*
* @author tjq
* @since 2023/1/20
*/
public class CollectionUtils {
/**
* Return {@code true} if the supplied Collection is {@code null} or empty.
* Otherwise, return {@code false}.
* @param collection the Collection to check
* @return whether the given Collection is empty
*/
public static boolean isEmpty(Collection<?> collection) {
return (collection == null || collection.isEmpty());
}
/**
* Return {@code true} if the supplied Map is {@code null} or empty.
* Otherwise, return {@code false}.
* @param map the Map to check
* @return whether the given Map is empty
*/
public static boolean isEmpty(Map<?, ?> map) {
return (map == null || map.isEmpty());
}
}

View File

@ -57,8 +57,7 @@ public class MainApplication implements Runnable {
cfg.setMaxResultLength(length);
cfg.setTag(tag);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(cfg);
PowerJobWorker worker = new PowerJobWorker(cfg);
worker.init();
}catch (Exception e) {

View File

@ -7,6 +7,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.PowerJobSpringWorker;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
@ -83,9 +84,7 @@ public class PowerJobAutoConfiguration {
/*
* Create OhMyWorker object and set properties.
*/
PowerJobWorker ohMyWorker = new PowerJobWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
return new PowerJobSpringWorker(config);
}
}

View File

@ -28,11 +28,12 @@
<dependencies>
<!-- Spring 依赖 -->
<!-- Spring 依赖(非强依赖) -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<!-- PowerJob 通讯框架 -->

View File

@ -0,0 +1,49 @@
package tech.powerjob.worker;
import com.google.common.collect.Lists;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import tech.powerjob.worker.processor.impl.BuiltInSpringProcessorFactory;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* Spring 项目中的 Worker 启动器
* 能够获取到由 Spring IOC 容器管理的 processor
*
* @author tjq
* @since 2023/1/20
*/
public class PowerJobSpringWorker extends PowerJobWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
public PowerJobSpringWorker(PowerJobWorkerConfig config) {
super(config);
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);
// append BuiltInSpringProcessorFactory
PowerJobWorkerConfig workerConfig = workerRuntime.getWorkerConfig();
List<ProcessorFactory> processorFactories = Lists.newArrayList(
Optional.ofNullable(workerConfig.getProcessorFactoryList())
.orElse(Collections.emptyList()));
processorFactories.add(springProcessorFactory);
workerConfig.setProcessorFactoryList(processorFactories);
}
}

View File

@ -3,11 +3,6 @@ package tech.powerjob.worker;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
@ -29,7 +24,6 @@ import tech.powerjob.worker.background.WorkerHealthReporter;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.SpringUtils;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import tech.powerjob.worker.persistence.TaskPersistenceService;
@ -52,21 +46,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @since 2020/3/16
*/
@Slf4j
public class PowerJobWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
public class PowerJobWorker {
private final RemoteEngine remoteEngine;
protected final WorkerRuntime workerRuntime;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final WorkerRuntime workerRuntime = new WorkerRuntime();
private final RemoteEngine remoteEngine = new PowerJobRemoteEngine();
private final AtomicBoolean initialized = new AtomicBoolean();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.inject(applicationContext);
}
@Override
public void afterPropertiesSet() throws Exception {
init();
public PowerJobWorker(PowerJobWorkerConfig config) {
this.workerRuntime = new WorkerRuntime();
this.remoteEngine = new PowerJobRemoteEngine();
workerRuntime.setWorkerConfig(config);
}
public void init() throws Exception {
@ -152,10 +140,6 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
}
}
public void setConfig(PowerJobWorkerConfig config) {
workerRuntime.setWorkerConfig(config);
}
@SuppressWarnings("rawtypes")
private void assertAppName() {
@ -199,7 +183,6 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
return new PowerJobProcessorLoader(finalPF);
}
@Override
public void destroy() throws Exception {
workerRuntime.getExecutorManager().shutdown();
remoteEngine.close();

View File

@ -1,8 +1,8 @@
package tech.powerjob.worker.actors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;

View File

@ -3,15 +3,15 @@ package tech.powerjob.worker.background;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker;
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker;
import java.util.List;
import java.util.Map;
@ -55,7 +55,7 @@ public class ServerDiscoveryService {
public void start(ScheduledExecutorService timingPool) {
this.currentServerAddress = discovery();
if (org.springframework.util.StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
throw new PowerJobException("can't find any available server, this worker has been quarantined.");
}
// 这里必须保证成功

View File

@ -12,28 +12,6 @@ import org.springframework.context.ApplicationContext;
@Slf4j
public class SpringUtils {
private static boolean supportSpringBean = false;
private static ApplicationContext context;
public static void inject(ApplicationContext ctx) {
context = ctx;
supportSpringBean = true;
}
public static boolean supportSpringBean() {
return supportSpringBean;
}
public static <T> T getBean(Class<T> clz) {
return context.getBean(clz);
}
public static <T> T getBean(String className) throws Exception {
return getBean(className, context);
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String className, ApplicationContext ctx) throws Exception {
// 1. ClassLoader 存在则直接使用 clz 加载
@ -48,7 +26,7 @@ public class SpringUtils {
char[] cs = beanName.toCharArray();
cs[0] += 32;
String beanName0 = String.valueOf(cs);
log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", context, beanName0);
log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0);
return (T) ctx.getBean(beanName0);
}

View File

@ -2,8 +2,8 @@ package tech.powerjob.worker.core.processor.sdk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.exception.PowerJobCheckedException;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.worker.common.ThreadLocalStore;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;

View File

@ -4,10 +4,10 @@ import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;

View File

@ -4,7 +4,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
@ -14,17 +13,15 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.persistence.TaskDO;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -176,7 +173,6 @@ public class CommonTaskTracker extends HeavyTaskTracker {
finished.set(true);
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
success = false;
result = SystemInstanceResult.UNKNOWN_BUG;
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
} else {

View File

@ -1,17 +1,23 @@
package tech.powerjob.worker.core.tracker.task.heavy;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.WorkerQueryExecutorClusterReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
@ -26,12 +32,6 @@ import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.util.List;

View File

@ -1,17 +1,17 @@
package tech.powerjob.worker.persistence;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.SupplierPlus;
import tech.powerjob.worker.common.constants.StoreStrategy;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;