feat: support serverInfoAware

This commit is contained in:
tjq 2022-09-12 12:56:12 +08:00
parent 5450ac00db
commit 74f70cd58b
14 changed files with 135 additions and 89 deletions

View File

@ -0,0 +1,10 @@
package tech.powerjob.server.common.aware;
/**
* PowerJobAware
*
* @author tjq
* @since 2022/9/12
*/
public interface PowerJobAware {
}

View File

@ -0,0 +1,14 @@
package tech.powerjob.server.common.aware;
import tech.powerjob.server.common.module.ServerInfo;
/**
* notify server info
*
* @author tjq
* @since 2022/9/12
*/
public interface ServerInfoAware extends PowerJobAware {
void setServerInfo(ServerInfo serverInfo);
}

View File

@ -0,0 +1,19 @@
package tech.powerjob.server.common.module;
import lombok.Data;
/**
* current server info
*
* @author tjq
* @since 2022/9/12
*/
@Data
public class ServerInfo {
private Long id;
private String ip;
private String version = "UNKNOWN";
}

View File

@ -1,9 +1,9 @@
package tech.powerjob.server.core.uid; package tech.powerjob.server.core.uid;
import tech.powerjob.server.remote.server.ServerInfoService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.powerjob.server.remote.server.self.ServerInfoService;
/** /**
* 唯一ID生成服务使用 Twitter snowflake 算法 * 唯一ID生成服务使用 Twitter snowflake 算法
@ -23,7 +23,7 @@ public class IdGenerateService {
@Autowired @Autowired
public IdGenerateService(ServerInfoService serverInfoService) { public IdGenerateService(ServerInfoService serverInfoService) {
long id = serverInfoService.getServerId(); long id = serverInfoService.fetchServiceInfo().getId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id); log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
} }

View File

@ -10,9 +10,8 @@ public interface Monitor {
/** /**
* 全局上下文绑定 & 初始化 * 全局上下文绑定 & 初始化
* @param monitorContext 日志上下文
*/ */
void init(MonitorContext monitorContext); void init();
/** /**
* 记录监控事件 * 记录监控事件
* 请注意该方法务必异步不阻塞 * 请注意该方法务必异步不阻塞

View File

@ -1,24 +0,0 @@
package tech.powerjob.server.monitor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 日志全局上下文
*
* @author tjq
* @since 2022/9/10
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class MonitorContext implements Serializable {
private long serverId;
private String serverAddress;
}

View File

@ -1,16 +1,10 @@
package tech.powerjob.server.initializer; package tech.powerjob.server.monitor;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import tech.powerjob.server.monitor.Event;
import tech.powerjob.server.monitor.Monitor;
import tech.powerjob.server.monitor.MonitorContext;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.remote.server.ServerInfoService;
import javax.annotation.Resource;
import java.util.List; import java.util.List;
/** /**
@ -23,20 +17,13 @@ import java.util.List;
@Component @Component
public class PowerJobMonitorService implements MonitorService { public class PowerJobMonitorService implements MonitorService {
@Resource
private ServerInfoService service;
private final List<Monitor> monitors = Lists.newLinkedList(); private final List<Monitor> monitors = Lists.newLinkedList();
@Autowired @Autowired
public PowerJobMonitorService(List<Monitor> monitors) { public PowerJobMonitorService(List<Monitor> monitors) {
MonitorContext monitorContext = new MonitorContext().setServerId(service.getServerId()).setServerAddress(service.getServerIp());
log.info("[MonitorService] use monitor context: {}", monitorContext);
monitors.forEach(m -> { monitors.forEach(m -> {
log.info("[MonitorService] register monitor: {}", m.getClass().getName()); log.info("[MonitorService] register monitor: {}", m.getClass().getName());
m.init(monitorContext);
this.monitors.add(m); this.monitors.add(m);
}); });
} }

View File

@ -3,9 +3,10 @@ package tech.powerjob.server.monitor.monitors;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.monitor.Event; import tech.powerjob.server.monitor.Event;
import tech.powerjob.server.monitor.Monitor; import tech.powerjob.server.monitor.Monitor;
import tech.powerjob.server.monitor.MonitorContext;
/** /**
* 系统默认实现基于日志的监控监视器 * 系统默认实现基于日志的监控监视器
@ -15,22 +16,30 @@ import tech.powerjob.server.monitor.MonitorContext;
* @since 2022/9/6 * @since 2022/9/6
*/ */
@Component @Component
public class LogMonitor implements Monitor { public class LogMonitor implements Monitor, ServerInfoAware {
private MonitorContext monitorContext; /**
* server 启动依赖 DBDB会被 monitor因此最初的几条 log serverInfo 一定为空在此处简单防空
*/
private ServerInfo serverInfo = new ServerInfo();
private static final String MDC_KEY_SERVER_ID = "serverId"; private static final String MDC_KEY_SERVER_ID = "serverId";
private static final String MDC_KEY_SERVER_ADDRESS = "serverAddress"; private static final String MDC_KEY_SERVER_ADDRESS = "serverAddress";
@Override @Override
public void init(MonitorContext monitorContext) { public void init() {
this.monitorContext = monitorContext;
} }
@Override @Override
public void record(Event event) { public void record(Event event) {
MDC.put(MDC_KEY_SERVER_ID, String.valueOf(monitorContext.getServerId())); MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));
MDC.put(MDC_KEY_SERVER_ADDRESS, monitorContext.getServerAddress()); MDC.put(MDC_KEY_SERVER_ADDRESS, serverInfo.getIp());
LoggerFactory.getLogger(event.type()).info(event.message()); LoggerFactory.getLogger(event.type()).info(event.message());
} }
@Override
public void setServerInfo(ServerInfo serverInfo) {
this.serverInfo = serverInfo;
}
} }

View File

@ -0,0 +1,15 @@
package tech.powerjob.server.remote.server.self;
import tech.powerjob.server.common.module.ServerInfo;
/**
* ServerInfoService
*
* @author tjq
* @since 2022/9/12
*/
public interface ServerInfoService {
ServerInfo fetchServiceInfo();
}

View File

@ -1,10 +1,11 @@
package tech.powerjob.server.remote.server; package tech.powerjob.server.remote.server.self;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.info.BuildProperties; import org.springframework.boot.info.BuildProperties;
import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.extension.LockService; import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.ServerInfoDO; import tech.powerjob.server.persistence.remote.model.ServerInfoDO;
import tech.powerjob.server.persistence.remote.repository.ServerInfoRepository; import tech.powerjob.server.persistence.remote.repository.ServerInfoRepository;
@ -28,37 +29,25 @@ import java.util.stream.Collectors;
*/ */
@Slf4j @Slf4j
@Service @Service
public class ServerInfoService { public class ServerInfoServiceImpl implements ServerInfoService {
private final String ip; private final ServerInfo serverInfo;
private final long serverId;
private final ServerInfoRepository serverInfoRepository; private final ServerInfoRepository serverInfoRepository;
private String version = "UNKNOWN";
private static final long MAX_SERVER_CLUSTER_SIZE = 10000; private static final long MAX_SERVER_CLUSTER_SIZE = 10000;
private static final String SERVER_INIT_LOCK = "server_init_lock"; private static final String SERVER_INIT_LOCK = "server_init_lock";
private static final int SERVER_INIT_LOCK_MAX_TIME = 15000; private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;
public long getServerId() {
return serverId;
}
public String getServerIp() {
return ip;
}
public String getServerVersion() {
return version;
}
@Autowired @Autowired
public ServerInfoService(LockService lockService, ServerInfoRepository serverInfoRepository) { public ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {
this.ip = NetUtils.getLocalHost(); this.serverInfo = new ServerInfo();
String ip = NetUtils.getLocalHost();
serverInfo.setIp(ip);
this.serverInfoRepository = serverInfoRepository; this.serverInfoRepository = serverInfoRepository;
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
@ -80,10 +69,11 @@ public class ServerInfoService {
} }
if (server.getId() < MAX_SERVER_CLUSTER_SIZE) { if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {
this.serverId = server.getId(); serverInfo.setId(server.getId());
} else { } else {
this.serverId = retryServerId(); long retryServerId = retryServerId();
serverInfoRepository.updateIdByIp(this.serverId, ip); serverInfo.setId(retryServerId);
serverInfoRepository.updateIdByIp(retryServerId, ip);
} }
} catch (Exception e) { } catch (Exception e) {
@ -93,12 +83,12 @@ public class ServerInfoService {
lockService.unlock(SERVER_INIT_LOCK); lockService.unlock(SERVER_INIT_LOCK);
} }
log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverId, sw); log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);
} }
@Scheduled(fixedRate = 15000, initialDelay = 15000) @Scheduled(fixedRate = 15000, initialDelay = 15000)
public void heartbeat() { public void heartbeat() {
serverInfoRepository.updateGmtModifiedByIp(ip, new Date()); serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());
} }
@ -142,7 +132,12 @@ public class ServerInfoService {
} }
String pomVersion = buildProperties.getVersion(); String pomVersion = buildProperties.getVersion();
if (StringUtils.isNotBlank(pomVersion)) { if (StringUtils.isNotBlank(pomVersion)) {
version = pomVersion; serverInfo.setVersion(pomVersion);
} }
} }
@Override
public ServerInfo fetchServiceInfo() {
return serverInfo;
}
} }

View File

@ -9,7 +9,7 @@ import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2; import springfox.documentation.swagger2.annotations.EnableSwagger2;
import tech.powerjob.server.common.PowerJobServerConfigKey; import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.remote.server.ServerInfoService; import tech.powerjob.server.remote.server.self.ServerInfoService;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -39,7 +39,7 @@ public class SwaggerConfig {
.description("Distributed scheduling and computing framework.") .description("Distributed scheduling and computing framework.")
.license("Apache Licence 2") .license("Apache Licence 2")
.termsOfServiceUrl("https://github.com/PowerJob/PowerJob") .termsOfServiceUrl("https://github.com/PowerJob/PowerJob")
.version(serverInfoService.getServerVersion()) .version(serverInfoService.fetchServiceInfo().getVersion())
.build(); .build();
return new Docket(DocumentationType.SWAGGER_2) return new Docket(DocumentationType.SWAGGER_2)

View File

@ -0,0 +1,29 @@
package tech.powerjob.server.support;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.remote.server.self.ServerInfoService;
import java.util.List;
/**
* ServerInfoAwareProcessor
*
* @author tjq
* @since 2022/9/12
*/
@Slf4j
@Component
public class ServerInfoAwareProcessor {
public ServerInfoAwareProcessor(ServerInfoService serverInfoService, List<ServerInfoAware> awareList) {
final ServerInfo serverInfo = serverInfoService.fetchServiceInfo();
log.info("[ServerInfoAwareProcessor] current server info: {}", serverInfo);
awareList.forEach(aware -> {
aware.setServerInfo(serverInfo);
log.info("[ServerInfoAwareProcessor] set ServerInfo for: {} successfully", aware);
});
}
}

View File

@ -4,9 +4,10 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.OmsConstant; import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.ServerInfoService; import tech.powerjob.server.remote.server.self.ServerInfoService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.web.response.SystemOverviewVO; import tech.powerjob.server.web.response.SystemOverviewVO;
@ -70,8 +71,7 @@ public class SystemInfoController {
// 服务器时间 // 服务器时间
overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN)); overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));
SystemOverviewVO.CurrentServerInfo info = new SystemOverviewVO.CurrentServerInfo(serverInfoService.getServerId(), serverInfoService.getServerIp(), serverInfoService.getServerVersion()); overview.setServerInfo(serverInfoService.fetchServiceInfo());
overview.setCurrentServerInfo(info);
return ResultDTO.success(overview); return ResultDTO.success(overview);
} }

View File

@ -3,6 +3,7 @@ package tech.powerjob.server.web.response;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.Getter; import lombok.Getter;
import tech.powerjob.server.common.module.ServerInfo;
/** /**
* 系统概览 * 系统概览
@ -21,13 +22,5 @@ public class SystemOverviewVO {
// 服务器时间 // 服务器时间
private String serverTime; private String serverTime;
private CurrentServerInfo currentServerInfo; private ServerInfo serverInfo;
@Getter
@AllArgsConstructor
public static class CurrentServerInfo {
private final long id;
private final String ip;
private final String version;
}
} }