feat: allow user to extend ProcessorFactory

This commit is contained in:
tjq 2023-01-20 15:54:48 +08:00
parent 16f5e67cf0
commit 847cf23738
15 changed files with 93 additions and 206 deletions

View File

@ -31,9 +31,17 @@ import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.SpringUtils;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.processor.PowerJobProcessorLoader;
import tech.powerjob.worker.processor.ProcessorLoader;
import tech.powerjob.worker.processor.impl.BuiltInDefaultProcessorFactory;
import tech.powerjob.worker.processor.impl.JarContainerProcessorFactory;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -95,6 +103,10 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
workerRuntime.setExecutorManager(executorManager);
// 初始化 ProcessorLoader
ProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);
workerRuntime.setProcessorLoader(processorLoader);
// 初始化 actor
TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);
ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);
@ -176,6 +188,17 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
throw new PowerJobException("no server available!");
}
private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) {
List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList());
List<ProcessorFactory> finalPF = Lists.newArrayList(customPF);
// 后置添加2个系统 ProcessorLoader
finalPF.add(new JarContainerProcessorFactory(runtime));
finalPF.add(new BuiltInDefaultProcessorFactory());
return new PowerJobProcessorLoader(finalPF);
}
@Override
public void destroy() throws Exception {
workerRuntime.getExecutorManager().shutdown();

View File

@ -9,6 +9,7 @@ import tech.powerjob.worker.common.constants.StoreStrategy;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.WorkflowContext;
import tech.powerjob.worker.extension.SystemMetricsCollector;
import tech.powerjob.worker.extension.processor.ProcessorFactory;
import java.util.List;
@ -65,9 +66,14 @@ public class PowerJobWorkerConfig {
* {@link WorkflowContext} max length for #appendedContextData
*/
private int maxAppendedWfContextLength = 8192;
/**
* user-customized system metrics collector
*/
private SystemMetricsCollector systemMetricsCollector;
/**
* Processor factory for custom logic, generally used for IOC framework processor bean injection that is not officially supported by PowerJob
*/
private List<ProcessorFactory> processorFactoryList;
private String tag;
/**

View File

@ -1,12 +1,12 @@
package tech.powerjob.worker.common;
import lombok.Data;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.WorkerHealthReporter;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import lombok.Data;
import tech.powerjob.worker.processor.ProcessorLoader;
/**
* store worker's runtime
@ -18,14 +18,22 @@ import lombok.Data;
public class WorkerRuntime {
private Long appId;
/**
* 当前执行器地址
*/
private String workerAddress;
/**
* 用户配置
*/
private PowerJobWorkerConfig workerConfig;
/**
* 通讯器
*/
private Transporter transporter;
private WorkerHealthReporter healthReporter;
/**
* 处理器加载器
*/
private ProcessorLoader processorLoader;
private ExecutorManager executorManager;

View File

@ -1,62 +0,0 @@
package tech.powerjob.worker.core;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.util.Map;
/**
* 处理器工厂
*
* @author tjq
* @since 2020/3/23
*/
@Slf4j
public class ProcessorBeanFactory {
/**
* key用来防止不同jar包同名类的冲突 -> (className -> Processor)
*/
private final Map<String, Map<String, BasicProcessor>> cache;
private static final String LOCAL_KEY = "local";
private static volatile ProcessorBeanFactory processorBeanFactory;
public ProcessorBeanFactory() {
// 初始化对象缓存
cache = Maps.newConcurrentMap();
Map<String, BasicProcessor> className2Processor = Maps.newConcurrentMap();
cache.put(LOCAL_KEY, className2Processor);
}
public BasicProcessor getLocalProcessor(String className) {
return cache.get(LOCAL_KEY).computeIfAbsent(className, ignore -> {
try {
Class<?> clz = Class.forName(className);
return (BasicProcessor) clz.getDeclaredConstructor().newInstance();
}catch (Exception e) {
log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e);
ExceptionUtils.rethrow(e);
}
return null;
});
}
public static ProcessorBeanFactory getInstance() {
if (processorBeanFactory != null) {
return processorBeanFactory;
}
synchronized (ProcessorBeanFactory.class) {
if (processorBeanFactory == null) {
processorBeanFactory = new ProcessorBeanFactory();
}
}
return processorBeanFactory;
}
}

View File

@ -1,22 +0,0 @@
package tech.powerjob.worker.core.processor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
/**
* @author Echo009
* @since 2022/9/23
*/
@RequiredArgsConstructor
@Getter
public class ProcessorInfo {
private final BasicProcessor basicProcessor;
private final ClassLoader classLoader;
public static ProcessorInfo of(BasicProcessor basicProcessor, ClassLoader classLoader) {
return new ProcessorInfo(basicProcessor, classLoader);
}
}

View File

@ -1,85 +0,0 @@
package tech.powerjob.worker.core.processor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.SpringUtils;
import tech.powerjob.worker.container.OmsContainer;
import tech.powerjob.worker.container.OmsContainerFactory;
import tech.powerjob.worker.core.ProcessorBeanFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Echo009
* @since 2022/9/19
*/
@Slf4j
public class ProcessorLoader {
private static final Map<String, ProcessorInfo> CACHE;
static {
// init
CACHE = new ConcurrentHashMap<>(128);
}
/**
* 获取处理器
* @param workerRuntime 运行时
* @param processorType 处理器类型
* @param processorInfo 处理器 id 一般是全限定类名
* @return processor
*/
public static ProcessorInfo loadProcessor(WorkerRuntime workerRuntime, String processorType, String processorInfo) {
ProcessorInfo processorInfoHolder = null;
ProcessorType type = ProcessorType.valueOf(processorType);
switch (type) {
case BUILT_IN:
// 先从缓存中取
processorInfoHolder = CACHE.computeIfAbsent(processorInfo, ignore -> {
// 先使用 Spring 加载
if (SpringUtils.supportSpringBean()) {
try {
return ProcessorInfo.of(SpringUtils.getBean(processorInfo),workerRuntime.getClass().getClassLoader());
} catch (Exception e) {
log.warn("[ProcessorLoader] no spring bean of processor(className={}), reason is {}.", processorInfo, ExceptionUtils.getMessage(e));
}
}
// 反射加载
return ProcessorInfo.of(ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo),workerRuntime.getClass().getClassLoader());
});
break;
case EXTERNAL:
String[] split = processorInfo.split("#");
log.info("[ProcessorLoader] try to load processor({}) in container({})", split[1], split[0]);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), workerRuntime);
if (omsContainer != null) {
processorInfoHolder = ProcessorInfo.of(omsContainer.getProcessor(split[1]), omsContainer.getContainerClassLoader());
} else {
log.warn("[ProcessorLoader] load container failed. processor info : {}", processorInfo);
}
break;
default:
log.warn("[ProcessorLoader] unknown processor type: {}.", processorType);
throw new PowerJobException("unknown processor type of " + processorType);
}
if (processorInfoHolder == null) {
log.warn("[ProcessorLoader] fetch Processor(type={},info={}) failed.", processorType, processorInfo);
throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
}
return processorInfoHolder;
}
}

View File

@ -15,6 +15,7 @@ import tech.powerjob.worker.core.processor.WorkflowContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
@ -46,12 +47,8 @@ public class HeavyProcessorRunnable implements Runnable {
private final InstanceInfo instanceInfo;
private final String taskTrackerAddress;
private final TaskDO task;
private final BasicProcessor processor;
private final ProcessorBean processorBean;
private final OmsLogger omsLogger;
/**
* 类加载器
*/
private final ClassLoader classLoader;
/**
* 重试队列ProcessorTracker 将会定期重新上报处理结果
*/
@ -60,6 +57,8 @@ public class HeavyProcessorRunnable implements Runnable {
public void innerRun() throws InterruptedException {
final BasicProcessor processor = processorBean.getProcessor();
String taskId = task.getTaskId();
Long instanceId = task.getInstanceId();
@ -130,6 +129,7 @@ public class HeavyProcessorRunnable implements Runnable {
* MAP_REDUCE => {@link MapReduceProcessor#reduce}
*/
private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) {
final BasicProcessor processor = processorBean.getProcessor();
ProcessResult processResult;
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
@ -174,6 +174,7 @@ public class HeavyProcessorRunnable implements Runnable {
* 即执行 {@link BroadcastProcessor#preProcess}并通知 TaskerTracker 创建广播子任务
*/
private void handleBroadcastRootTask(Long instanceId, TaskContext taskContext) {
BasicProcessor processor = processorBean.getProcessor();
ProcessResult processResult;
// 广播执行的第一个 task 只执行 preProcess 部分
if (processor instanceof BroadcastProcessor) {
@ -236,7 +237,7 @@ public class HeavyProcessorRunnable implements Runnable {
@SuppressWarnings("squid:S2142")
public void run() {
// 切换线程上下文类加载器否则用的是 Worker 类加载器不存在容器类在序列化/反序列化时会报 ClassNotFoundException
Thread.currentThread().setContextClassLoader(classLoader);
Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
try {
innerRun();
} catch (InterruptedException ignore) {

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
@ -13,10 +12,10 @@ import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.ProcessorInfo;
import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable;
import tech.powerjob.worker.core.processor.ProcessorLoader;
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.log.OmsLoggerFactory;
import tech.powerjob.worker.persistence.TaskDO;
@ -52,7 +51,7 @@ public class ProcessorTracker {
*/
private Long instanceId;
private ProcessorInfo processorInfo;
private ProcessorBean processorBean;
/**
* 在线日志
*/
@ -111,7 +110,7 @@ public class ProcessorTracker {
// 初始化定时任务
initTimingJob();
// 初始化 Processor
processorInfo = ProcessorLoader.loadProcessor(workerRuntime, instanceInfo.getProcessorType(), instanceInfo.getProcessorInfo());
processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(instanceInfo.getProcessorType()).setProcessorInfo(instanceInfo.getProcessorInfo()));
log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);
} catch (Throwable t) {
log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t);
@ -152,8 +151,7 @@ public class ProcessorTracker {
newTask.setInstanceId(instanceInfo.getInstanceId());
newTask.setAddress(taskTrackerAddress);
ClassLoader classLoader = processorInfo.getClassLoader();
HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorInfo.getBasicProcessor(), omsLogger, classLoader, statusReportRetryQueue, workerRuntime);
HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);
try {
threadPool.submit(heavyProcessorRunnable);
success = true;

View File

@ -16,6 +16,8 @@ import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.*;
import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
import tech.powerjob.worker.core.tracker.task.TaskTracker;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
import tech.powerjob.worker.log.OmsLoggerFactory;
import java.util.concurrent.Future;
@ -49,7 +51,7 @@ public class LightTaskTracker extends TaskTracker {
/**
* 处理器信息
*/
private final ProcessorInfo processorInfo;
private final ProcessorBean processorBean;
/**
* 上下文
*/
@ -85,7 +87,7 @@ public class LightTaskTracker extends TaskTracker {
// 等待处理
status = TaskStatus.WORKER_RECEIVED;
// 加载 Processor
processorInfo = ProcessorLoader.loadProcessor(workerRuntime, req.getProcessorType(), req.getProcessorInfo());
processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo()));
executeThread = new AtomicReference<>();
long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L;
// 初始延迟加入随机值避免在高并发场景下所有请求集中在一个时间段
@ -199,14 +201,14 @@ public class LightTaskTracker extends TaskTracker {
// 开始执行时提交任务判断是否超时
ProcessResult res = null;
do {
Thread.currentThread().setContextClassLoader(processorInfo.getClassLoader());
Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
if (res != null && !res.isSuccess()) {
// 重试
taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1);
log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes());
}
try {
res = processorInfo.getBasicProcessor().process(taskContext);
res = processorBean.getProcessor().process(taskContext);
} catch (InterruptedException e) {
log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
Thread.currentThread().interrupt();

View File

@ -10,8 +10,9 @@ import java.util.Objects;
/**
* 处理器定义
* 对外暴露的对象尽量不要直接使用构造器等不方便后续扩展的 APIGetter & Setter 保兼容
*
* @author tjq
* @author Echo009
* @since 2023/1/17
*/
@Getter

View File

@ -1,6 +1,5 @@
package tech.powerjob.worker.processor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.worker.extension.processor.ProcessorBean;
@ -11,6 +10,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* PowerJobProcessorLoader
@ -19,29 +19,35 @@ import java.util.Optional;
* @since 2023/1/17
*/
@Slf4j
public class PowerJobProcessorLoader {
public class PowerJobProcessorLoader implements ProcessorLoader {
private final List<ProcessorFactory> processorFactoryList;
private final Map<ProcessorDefinition, ProcessorBean> def2Bean = Maps.newConcurrentMap();
private final Map<ProcessorDefinition, ProcessorBean> def2Bean = new ConcurrentHashMap<>(128);
public PowerJobProcessorLoader(List<ProcessorFactory> processorFactoryList) {
this.processorFactoryList = processorFactoryList;
}
@Override
public ProcessorBean load(ProcessorDefinition definition) {
return def2Bean.computeIfAbsent(definition, ignore -> {
final String processorType = definition.getProcessorType();
log.info("[ProcessorFactory] start to load Processor: {}", definition);
for (ProcessorFactory pf : processorFactoryList) {
if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(definition.getProcessorType())) {
final String pfName = pf.getClass().getSimpleName();
if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) {
log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType);
continue;
}
log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition);
try {
ProcessorBean processorBean = pf.build(definition);
if (processorBean != null) {
log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition);
return processorBean;
}
} catch (Throwable t) {
log.error("[ProcessorFactory] [{}] load processor failed: {}", pf.getClass().getSimpleName(), definition, t);
log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t);
}
}
throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");

View File

@ -0,0 +1,15 @@
package tech.powerjob.worker.processor;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
/**
* 内部使用的 Processor 加载器
*
* @author Echo009
* @since 2023/1/20
*/
public interface ProcessorLoader {
ProcessorBean load(ProcessorDefinition definition);
}

View File

@ -27,13 +27,11 @@ public class BuiltInDefaultProcessorFactory implements ProcessorFactory {
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
log.info("[ProcessorFactory] use 'BuiltInDefaultProcessorFactory' to load, processorDefinition is: {}", processorDefinition);
String className = processorDefinition.getProcessorInfo();
try {
Class<?> clz = Class.forName(className);
BasicProcessor basicProcessor = (BasicProcessor) clz.getDeclaredConstructor().newInstance();
log.info("[ProcessorFactory] use 'BuiltInDefaultProcessorFactory' load processor successfully: {}", processorDefinition);
return new ProcessorBean()
.setProcessor(basicProcessor)
.setClassLoader(basicProcessor.getClass().getClassLoader());

View File

@ -35,7 +35,6 @@ public class BuiltInSpringProcessorFactory implements ProcessorFactory {
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
log.info("[ProcessorFactory] use 'BuiltInSpringProcessorFactory' to load, processorDefinition is: {}", processorDefinition);
try {
boolean canLoad = checkCanLoad();
if (!canLoad) {

View File

@ -34,7 +34,6 @@ public class JarContainerProcessorFactory implements ProcessorFactory {
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
log.info("[ProcessorFactory] use 'JarContainerProcessorFactory' to load, processorDefinition is: {}", processorDefinition);
String processorInfo = processorDefinition.getProcessorInfo();
String[] split = processorInfo.split("#");