From a138f9c8cccfeaf513b0e327c653371558a7c480 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Sep 2023 11:53:01 +0800 Subject: [PATCH] feat: support lazy init#725 --- .../common/response/ObjectResultDTO.java | 10 ++ .../tech/powerjob/worker/PowerJobWorker.java | 56 ++-------- .../worker/background/OmsLogHandler.java | 1 + .../worker/background/discovery/AppInfo.java | 22 ++++ .../PowerJobServerDiscoveryService.java} | 104 +++++++++++++++--- .../discovery/ServerDiscoveryService.java | 30 +++++ .../powerjob/worker/common/WorkerRuntime.java | 14 ++- 7 files changed, 171 insertions(+), 66 deletions(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/response/ObjectResultDTO.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/AppInfo.java rename powerjob-worker/src/main/java/tech/powerjob/worker/background/{ServerDiscoveryService.java => discovery/PowerJobServerDiscoveryService.java} (58%) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/ServerDiscoveryService.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/ObjectResultDTO.java b/powerjob-common/src/main/java/tech/powerjob/common/response/ObjectResultDTO.java new file mode 100644 index 00000000..b4a5a4dd --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/response/ObjectResultDTO.java @@ -0,0 +1,10 @@ +package tech.powerjob.common.response; + +/** + * 主要目的:消除 idea 烦人的类型提示 + * + * @author tjq + * @since 2023/9/2 + */ +public class ObjectResultDTO extends ResultDTO { +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index bbe682d2..6c0e36c1 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -4,11 +4,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.PowerJobDKey; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.response.ResultDTO; -import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.HttpUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.PropertyUtils; import tech.powerjob.remote.framework.base.Address; @@ -21,8 +17,10 @@ import tech.powerjob.worker.actors.ProcessorTrackerActor; import tech.powerjob.worker.actors.TaskTrackerActor; import tech.powerjob.worker.actors.WorkerActor; import tech.powerjob.worker.background.OmsLogHandler; -import tech.powerjob.worker.background.ServerDiscoveryService; import tech.powerjob.worker.background.WorkerHealthReporter; +import tech.powerjob.worker.background.discovery.AppInfo; +import tech.powerjob.worker.background.discovery.PowerJobServerDiscoveryService; +import tech.powerjob.worker.background.discovery.ServerDiscoveryService; import tech.powerjob.worker.common.PowerBannerPrinter; import tech.powerjob.worker.common.PowerJobWorkerConfig; import tech.powerjob.worker.common.WorkerRuntime; @@ -36,7 +34,6 @@ import tech.powerjob.worker.processor.impl.JarContainerProcessorFactory; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,14 +69,14 @@ public class PowerJobWorker { PowerJobWorkerConfig config = workerRuntime.getWorkerConfig(); CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first"); + ServerDiscoveryService serverDiscoveryService = new PowerJobServerDiscoveryService(config); + workerRuntime.setServerDiscoveryService(serverDiscoveryService); + try { PowerBannerPrinter.print(); // 校验 appName - if (!config.isEnableTestMode()) { - assertAppName(); - } else { - log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env."); - } + AppInfo appInfo = serverDiscoveryService.assertApp(); + workerRuntime.setAppInfo(appInfo); // 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址) String localBindIp = NetUtils.getLocalHost(); @@ -113,10 +110,7 @@ public class PowerJobWorker { workerRuntime.setTransporter(engineOutput.getTransporter()); // 连接 server - ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig()); - - serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor()); - workerRuntime.setServerDiscoveryService(serverDiscoveryService); + serverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor()); log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully."); @@ -142,38 +136,6 @@ public class PowerJobWorker { } } - @SuppressWarnings("rawtypes") - private void assertAppName() { - - PowerJobWorkerConfig config = workerRuntime.getWorkerConfig(); - String appName = config.getAppName(); - Objects.requireNonNull(appName, "appName can't be empty!"); - - String url = "http://%s/server/assert?appName=%s"; - for (String server : config.getServerAddress()) { - String realUrl = String.format(url, server, appName); - try { - String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl)); - ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class); - if (resultDTO.isSuccess()) { - Long appId = Long.valueOf(resultDTO.getData().toString()); - log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId); - workerRuntime.setAppId(appId); - return; - }else { - log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); - throw new PowerJobException(resultDTO.getMessage()); - } - }catch (PowerJobException oe) { - throw oe; - }catch (Exception ignore) { - log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl); - } - } - log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress()); - throw new PowerJobException("no server available!"); - } - private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) { List customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList()); List finalPF = Lists.newArrayList(customPF); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java index 5ea219cc..a9ce0413 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java @@ -8,6 +8,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.worker.background.discovery.ServerDiscoveryService; import tech.powerjob.worker.common.utils.TransportUtils; import java.util.List; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/AppInfo.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/AppInfo.java new file mode 100644 index 00000000..52a23d97 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/AppInfo.java @@ -0,0 +1,22 @@ +package tech.powerjob.worker.background.discovery; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +/** + * 应用信息 + * + * @author tjq + * @since 2023/9/2 + */ +@Data +@NoArgsConstructor +@Accessors(chain = true) +public class AppInfo { + + /** + * 应用唯一 ID + */ + private Long appId; +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/PowerJobServerDiscoveryService.java similarity index 58% rename from powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java rename to powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/PowerJobServerDiscoveryService.java index e7e1752c..11db7074 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/PowerJobServerDiscoveryService.java @@ -1,13 +1,15 @@ -package tech.powerjob.worker.background; +package tech.powerjob.worker.background.discovery; import com.google.common.base.Joiner; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.exception.ImpossibleException; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.request.ServerDiscoveryRequest; -import tech.powerjob.common.response.ResultDTO; +import tech.powerjob.common.response.ObjectResultDTO; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; @@ -18,6 +20,7 @@ import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -25,13 +28,12 @@ import java.util.concurrent.TimeUnit; * 服务发现 * * @author tjq - * @since 2020/4/6 + * @since 2023/9/2 */ @Slf4j -public class ServerDiscoveryService { +public class PowerJobServerDiscoveryService implements ServerDiscoveryService { - private final Long appId; - private final PowerJobWorkerConfig config; + private final AppInfo appInfo = new AppInfo(); private String currentServerAddress; @@ -41,6 +43,8 @@ public class ServerDiscoveryService { * 服务发现地址 */ private static final String DISCOVERY_URL = "http://%s/server/acquire?%s"; + + private static final String ASSERT_URL = "http://%s/server/assert?appName=%s"; /** * 失败次数 */ @@ -50,13 +54,74 @@ public class ServerDiscoveryService { */ private static final int MAX_FAILED_COUNT = 3; + private final PowerJobWorkerConfig config; - public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) { - this.appId = appId; + public PowerJobServerDiscoveryService(PowerJobWorkerConfig config) { this.config = config; } - public void start(ScheduledExecutorService timingPool) { + @Override + public AppInfo assertApp() { + try { + return assertApp0(); + } catch (Exception e) { + if (config.isEnableTestMode()) { + log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env."); + + // 返回引用,方便后续更新对象内属性 + return appInfo; + } + ExceptionUtils.rethrow(e); + } + throw new ImpossibleException(); + } + + private AppInfo assertApp0() { + String appName = config.getAppName(); + Objects.requireNonNull(appName, "appName can't be empty!"); + + for (String server : config.getServerAddress()) { + String realUrl = String.format(ASSERT_URL, server, appName); + try { + String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl)); + ObjectResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ObjectResultDTO.class); + if (resultDTO.isSuccess()) { + + Object resultDataContent = resultDTO.getData(); + log.info("[PowerJobWorker] assert appName({}) succeed, result from server is: {}.", appName, resultDataContent); + // 兼容老版本,响应为数字 + if (StringUtils.isNumeric(resultDataContent.toString())) { + Long appId = Long.valueOf(resultDataContent.toString()); + this.appInfo.setAppId(appId); + return appInfo; + } + + // 新版本,接口直接下发 AppInfo 内容,后续可扩展安全加密等信息 + AppInfo serverAppInfo = JsonUtils.parseObject(JsonUtils.toJSONString(resultDataContent), AppInfo.class); + appInfo.setAppId(serverAppInfo.getAppId()); + return appInfo; + } else { + log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); + throw new PowerJobException(resultDTO.getMessage()); + } + } catch (PowerJobException oe) { + throw oe; + } catch (Exception ignore) { + log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl); + } + } + log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress()); + throw new PowerJobException("no server available!"); + } + + + @Override + public String getCurrentServerAddress() { + return currentServerAddress; + } + + @Override + public void timingCheck(ScheduledExecutorService timingPool) { this.currentServerAddress = discovery(); if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) { throw new PowerJobException("can't find any available server, this worker has been quarantined."); @@ -72,13 +137,18 @@ public class ServerDiscoveryService { , 10, 10, TimeUnit.SECONDS); } - public String getCurrentServerAddress() { - return currentServerAddress; - } - - private String discovery() { + // 只有允许延迟加载模式下,appId 才可能为空。每次服务发现前,都重新尝试获取 appInfo。由于是懒加载链路,此处完全忽略异常 + if (appInfo.getAppId() == null || appInfo.getAppId() < 0) { + try { + assertApp0(); + } catch (Exception e) { + log.warn("[PowerDiscovery] assertAppName in discovery stage failed, msg: {}", e.getMessage()); + return null; + } + } + if (ip2Address.isEmpty()) { config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x)); } @@ -131,7 +201,7 @@ public class ServerDiscoveryService { } } - @SuppressWarnings("rawtypes") + private String acquire(String httpServerAddress) { String result = null; String url = buildServerDiscoveryUrl(httpServerAddress); @@ -141,7 +211,7 @@ public class ServerDiscoveryService { } if (!StringUtils.isEmpty(result)) { try { - ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class); + ObjectResultDTO resultDTO = JsonUtils.parseObject(result, ObjectResultDTO.class); if (resultDTO.isSuccess()) { return resultDTO.getData().toString(); } @@ -154,7 +224,7 @@ public class ServerDiscoveryService { private String buildServerDiscoveryUrl(String address) { ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest() - .setAppId(appId) + .setAppId(appInfo.getAppId()) .setCurrentServer(currentServerAddress) .setProtocol(config.getProtocol().name().toUpperCase()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/ServerDiscoveryService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/ServerDiscoveryService.java new file mode 100644 index 00000000..4be3c5ea --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/discovery/ServerDiscoveryService.java @@ -0,0 +1,30 @@ +package tech.powerjob.worker.background.discovery; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * 服务发现 + * + * @author tjq + * @since 2023/9/2 + */ +public interface ServerDiscoveryService { + + /** + * 鉴权 & 附带信息下发 + * @return appInfo + */ + AppInfo assertApp(); + + /** + * 获取当前的 server 地址 + * @return server 地址 + */ + String getCurrentServerAddress(); + + /** + * 定时检查 + * @param timingPool timingPool + */ + void timingCheck(ScheduledExecutorService timingPool); +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java index 19524e6f..eb929d0a 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/WorkerRuntime.java @@ -3,11 +3,14 @@ package tech.powerjob.worker.common; import lombok.Data; import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.worker.background.OmsLogHandler; -import tech.powerjob.worker.background.ServerDiscoveryService; +import tech.powerjob.worker.background.discovery.AppInfo; +import tech.powerjob.worker.background.discovery.ServerDiscoveryService; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.processor.ProcessorLoader; +import java.util.Optional; + /** * store worker's runtime * @@ -17,7 +20,10 @@ import tech.powerjob.worker.processor.ProcessorLoader; @Data public class WorkerRuntime { - private Long appId; + /** + * App 基础信息 + */ + private AppInfo appInfo; /** * 当前执行器地址 */ @@ -42,4 +48,8 @@ public class WorkerRuntime { private ServerDiscoveryService serverDiscoveryService; private TaskPersistenceService taskPersistenceService; + + public Long getAppId() { + return Optional.ofNullable(appInfo.getAppId()).orElse(-1L); + } }