feat: support lazy init#725

This commit is contained in:
tjq 2023-09-02 11:53:01 +08:00
parent 5dbceb7ce4
commit a138f9c8cc
7 changed files with 171 additions and 66 deletions

View File

@ -0,0 +1,10 @@
package tech.powerjob.common.response;
/**
* 主要目的消除 idea 烦人的类型提示
*
* @author tjq
* @since 2023/9/2
*/
public class ObjectResultDTO extends ResultDTO<Object> {
}

View File

@ -4,11 +4,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.PropertyUtils;
import tech.powerjob.remote.framework.base.Address;
@ -21,8 +17,10 @@ import tech.powerjob.worker.actors.ProcessorTrackerActor;
import tech.powerjob.worker.actors.TaskTrackerActor;
import tech.powerjob.worker.actors.WorkerActor;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.WorkerHealthReporter;
import tech.powerjob.worker.background.discovery.AppInfo;
import tech.powerjob.worker.background.discovery.PowerJobServerDiscoveryService;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
@ -36,7 +34,6 @@ import tech.powerjob.worker.processor.impl.JarContainerProcessorFactory;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -72,14 +69,14 @@ public class PowerJobWorker {
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");
ServerDiscoveryService serverDiscoveryService = new PowerJobServerDiscoveryService(config);
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
try {
PowerBannerPrinter.print();
// 校验 appName
if (!config.isEnableTestMode()) {
assertAppName();
} else {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
AppInfo appInfo = serverDiscoveryService.assertApp();
workerRuntime.setAppInfo(appInfo);
// 初始化网络数据区别对待上报地址和本机绑定地址对外统一使用上报地址
String localBindIp = NetUtils.getLocalHost();
@ -113,10 +110,7 @@ public class PowerJobWorker {
workerRuntime.setTransporter(engineOutput.getTransporter());
// 连接 server
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
serverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor());
log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
@ -142,38 +136,6 @@ public class PowerJobWorker {
}
}
@SuppressWarnings("rawtypes")
private void assertAppName() {
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
String url = "http://%s/server/assert?appName=%s";
for (String server : config.getServerAddress()) {
String realUrl = String.format(url, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
workerRuntime.setAppId(appId);
return;
}else {
log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage());
}
}catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
throw new PowerJobException("no server available!");
}
private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) {
List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList());
List<ProcessorFactory> finalPF = Lists.newArrayList(customPF);

View File

@ -8,6 +8,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.utils.TransportUtils;
import java.util.List;

View File

@ -0,0 +1,22 @@
package tech.powerjob.worker.background.discovery;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* 应用信息
*
* @author tjq
* @since 2023/9/2
*/
@Data
@NoArgsConstructor
@Accessors(chain = true)
public class AppInfo {
/**
* 应用唯一 ID
*/
private Long appId;
}

View File

@ -1,13 +1,15 @@
package tech.powerjob.worker.background;
package tech.powerjob.worker.background.discovery;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.ImpossibleException;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.ServerDiscoveryRequest;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.response.ObjectResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
@ -18,6 +20,7 @@ import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -25,13 +28,12 @@ import java.util.concurrent.TimeUnit;
* 服务发现
*
* @author tjq
* @since 2020/4/6
* @since 2023/9/2
*/
@Slf4j
public class ServerDiscoveryService {
public class PowerJobServerDiscoveryService implements ServerDiscoveryService {
private final Long appId;
private final PowerJobWorkerConfig config;
private final AppInfo appInfo = new AppInfo();
private String currentServerAddress;
@ -41,6 +43,8 @@ public class ServerDiscoveryService {
* 服务发现地址
*/
private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
private static final String ASSERT_URL = "http://%s/server/assert?appName=%s";
/**
* 失败次数
*/
@ -50,13 +54,74 @@ public class ServerDiscoveryService {
*/
private static final int MAX_FAILED_COUNT = 3;
private final PowerJobWorkerConfig config;
public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
this.appId = appId;
public PowerJobServerDiscoveryService(PowerJobWorkerConfig config) {
this.config = config;
}
public void start(ScheduledExecutorService timingPool) {
@Override
public AppInfo assertApp() {
try {
return assertApp0();
} catch (Exception e) {
if (config.isEnableTestMode()) {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
// 返回引用方便后续更新对象内属性
return appInfo;
}
ExceptionUtils.rethrow(e);
}
throw new ImpossibleException();
}
private AppInfo assertApp0() {
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
for (String server : config.getServerAddress()) {
String realUrl = String.format(ASSERT_URL, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ObjectResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ObjectResultDTO.class);
if (resultDTO.isSuccess()) {
Object resultDataContent = resultDTO.getData();
log.info("[PowerJobWorker] assert appName({}) succeed, result from server is: {}.", appName, resultDataContent);
// 兼容老版本响应为数字
if (StringUtils.isNumeric(resultDataContent.toString())) {
Long appId = Long.valueOf(resultDataContent.toString());
this.appInfo.setAppId(appId);
return appInfo;
}
// 新版本接口直接下发 AppInfo 内容后续可扩展安全加密等信息
AppInfo serverAppInfo = JsonUtils.parseObject(JsonUtils.toJSONString(resultDataContent), AppInfo.class);
appInfo.setAppId(serverAppInfo.getAppId());
return appInfo;
} else {
log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage());
}
} catch (PowerJobException oe) {
throw oe;
} catch (Exception ignore) {
log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
throw new PowerJobException("no server available!");
}
@Override
public String getCurrentServerAddress() {
return currentServerAddress;
}
@Override
public void timingCheck(ScheduledExecutorService timingPool) {
this.currentServerAddress = discovery();
if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
throw new PowerJobException("can't find any available server, this worker has been quarantined.");
@ -72,13 +137,18 @@ public class ServerDiscoveryService {
, 10, 10, TimeUnit.SECONDS);
}
public String getCurrentServerAddress() {
return currentServerAddress;
}
private String discovery() {
// 只有允许延迟加载模式下appId 才可能为空每次服务发现前都重新尝试获取 appInfo由于是懒加载链路此处完全忽略异常
if (appInfo.getAppId() == null || appInfo.getAppId() < 0) {
try {
assertApp0();
} catch (Exception e) {
log.warn("[PowerDiscovery] assertAppName in discovery stage failed, msg: {}", e.getMessage());
return null;
}
}
if (ip2Address.isEmpty()) {
config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
}
@ -131,7 +201,7 @@ public class ServerDiscoveryService {
}
}
@SuppressWarnings("rawtypes")
private String acquire(String httpServerAddress) {
String result = null;
String url = buildServerDiscoveryUrl(httpServerAddress);
@ -141,7 +211,7 @@ public class ServerDiscoveryService {
}
if (!StringUtils.isEmpty(result)) {
try {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
ObjectResultDTO resultDTO = JsonUtils.parseObject(result, ObjectResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
@ -154,7 +224,7 @@ public class ServerDiscoveryService {
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setAppId(appInfo.getAppId())
.setCurrentServer(currentServerAddress)
.setProtocol(config.getProtocol().name().toUpperCase());

View File

@ -0,0 +1,30 @@
package tech.powerjob.worker.background.discovery;
import java.util.concurrent.ScheduledExecutorService;
/**
* 服务发现
*
* @author tjq
* @since 2023/9/2
*/
public interface ServerDiscoveryService {
/**
* 鉴权 & 附带信息下发
* @return appInfo
*/
AppInfo assertApp();
/**
* 获取当前的 server 地址
* @return server 地址
*/
String getCurrentServerAddress();
/**
* 定时检查
* @param timingPool timingPool
*/
void timingCheck(ScheduledExecutorService timingPool);
}

View File

@ -3,11 +3,14 @@ package tech.powerjob.worker.common;
import lombok.Data;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.discovery.AppInfo;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.processor.ProcessorLoader;
import java.util.Optional;
/**
* store worker's runtime
*
@ -17,7 +20,10 @@ import tech.powerjob.worker.processor.ProcessorLoader;
@Data
public class WorkerRuntime {
private Long appId;
/**
* App 基础信息
*/
private AppInfo appInfo;
/**
* 当前执行器地址
*/
@ -42,4 +48,8 @@ public class WorkerRuntime {
private ServerDiscoveryService serverDiscoveryService;
private TaskPersistenceService taskPersistenceService;
public Long getAppId() {
return Optional.ofNullable(appInfo.getAppId()).orElse(-1L);
}
}