[dev] finished all container web interface

This commit is contained in:
tjq 2020-05-19 08:46:53 +08:00
parent babd5c362f
commit a14f554e00
16 changed files with 164 additions and 20 deletions

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.oms.common.model;
import com.github.kfcfans.oms.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 已部署的容器信息
*
* @author tjq
* @since 2020/5/18
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DeployedContainerInfo implements OmsSerializable {
// 容器ID
private Long containerId;
// 版本
private String version;
// 部署时间
private long deployedTime;
// 机器地址无需上报
private String workerAddress;
}

View File

@ -16,6 +16,10 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor @AllArgsConstructor
public class ServerDeployContainerRequest implements OmsSerializable { public class ServerDeployContainerRequest implements OmsSerializable {
/**
* 容器ID
*/
private Long containerId;
/** /**
* 容器名称 * 容器名称
*/ */

View File

@ -1,9 +1,12 @@
package com.github.kfcfans.oms.common.request; package com.github.kfcfans.oms.common.request;
import com.github.kfcfans.oms.common.OmsSerializable; import com.github.kfcfans.oms.common.OmsSerializable;
import com.github.kfcfans.oms.common.model.DeployedContainerInfo;
import com.github.kfcfans.oms.common.model.SystemMetrics; import com.github.kfcfans.oms.common.model.SystemMetrics;
import lombok.Data; import lombok.Data;
import java.util.List;
/** /**
* Worker 上报健康信息worker定时发送的heartbeat * Worker 上报健康信息worker定时发送的heartbeat
@ -22,6 +25,8 @@ public class WorkerHeartbeat implements OmsSerializable {
private Long appId; private Long appId;
// 当前时间 // 当前时间
private long heartbeatTime; private long heartbeatTime;
// 当前加载的容器容器名称 -> 容器版本
private List<DeployedContainerInfo> containerInfos;
private SystemMetrics systemMetrics; private SystemMetrics systemMetrics;
} }

View File

@ -96,6 +96,7 @@ public class ServerActor extends AbstractActor {
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest(); ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
BeanUtils.copyProperties(containerInfo, dpReq); BeanUtils.copyProperties(containerInfo, dpReq);
dpReq.setContainerId(containerInfo.getId());
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion()); String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion());
dpReq.setDownloadURL(downloadURL); dpReq.setDownloadURL(downloadURL);

View File

@ -13,6 +13,9 @@ import java.util.concurrent.ThreadPoolExecutor;
/** /**
* 公用线程池配置 * 公用线程池配置
* omsTimingPool用于执行定时任务的线程池
* omsCommonPool用于执行普通任务的线程池
* taskScheduler用于定时调度的线程池
* *
* @author tjq * @author tjq
* @since 2020/4/28 * @since 2020/4/28

View File

@ -6,7 +6,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.io.*; import java.io.*;
import java.net.URL;
import java.util.Objects; import java.util.Objects;
/** /**
@ -31,7 +30,7 @@ public class ContainerTemplateGenerator {
*/ */
public static File generate(String group, String artifact, String name, String packageName, Integer javaVersion) throws IOException { public static File generate(String group, String artifact, String name, String packageName, Integer javaVersion) throws IOException {
String workerDir = OmsFileUtils.genTemporaryPath(); String workerDir = OmsFileUtils.genTemporaryWorkePath();
File originJar = new File(workerDir + "tmp.jar"); File originJar = new File(workerDir + "tmp.jar");
String tmpPath = workerDir + "/unzip/"; String tmpPath = workerDir + "/unzip/";

View File

@ -37,12 +37,20 @@ public class OmsFileUtils {
} }
/** /**
* 获取临时目录用完记得删除 * 获取临时目录固定目录
* @return 临时目录 * @return 目录
*/ */
public static String genTemporaryPath() { public static String genTemporaryPath() {
return COMMON_PATH + "temporary/";
}
/**
* 获取临时目录随机目录不会重复用完记得删除
* @return 临时目录
*/
public static String genTemporaryWorkePath() {
String uuid = StringUtils.replace(UUID.randomUUID().toString(), "-", ""); String uuid = StringUtils.replace(UUID.randomUUID().toString(), "-", "");
return COMMON_PATH + "temporary/" + uuid + "/"; return genTemporaryPath() + uuid + "/";
} }
/** /**

View File

@ -130,7 +130,7 @@ public class ContainerService {
*/ */
public String uploadContainerJarFile(MultipartFile file) throws IOException { public String uploadContainerJarFile(MultipartFile file) throws IOException {
String workerDirStr = OmsFileUtils.genTemporaryPath(); String workerDirStr = OmsFileUtils.genTemporaryWorkePath();
String tmpFileStr = workerDirStr + "tmp.jar"; String tmpFileStr = workerDirStr + "tmp.jar";
File workerDir = new File(workerDirStr); File workerDir = new File(workerDirStr);
@ -238,7 +238,7 @@ public class ContainerService {
String port = environment.getProperty("local.server.port"); String port = environment.getProperty("local.server.port");
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, container.getVersion()); String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, container.getVersion());
ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerName, container.getVersion(), downloadURL); ServerDeployContainerRequest req = new ServerDeployContainerRequest(container.getId(), containerName, container.getVersion(), downloadURL);
long sleepTime = calculateSleepTime(jarFile.length()); long sleepTime = calculateSleepTime(jarFile.length());
AtomicInteger count = new AtomicInteger(); AtomicInteger count = new AtomicInteger();
@ -267,7 +267,7 @@ public class ContainerService {
ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType()); ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType());
if (sourceType == ContainerSourceType.Git) { if (sourceType == ContainerSourceType.Git) {
String workerDirStr = OmsFileUtils.genTemporaryPath(); String workerDirStr = OmsFileUtils.genTemporaryWorkePath();
File workerDir = new File(workerDirStr); File workerDir = new File(workerDirStr);
FileUtils.forceMkdir(workerDir); FileUtils.forceMkdir(workerDir);

View File

@ -1,10 +1,12 @@
package com.github.kfcfans.oms.server.service.ha; package com.github.kfcfans.oms.server.service.ha;
import com.github.kfcfans.oms.common.model.DeployedContainerInfo;
import com.github.kfcfans.oms.common.model.SystemMetrics; import com.github.kfcfans.oms.common.model.SystemMetrics;
import com.github.kfcfans.oms.common.request.WorkerHeartbeat; import com.github.kfcfans.oms.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -22,6 +24,8 @@ public class ClusterStatusHolder {
private String appName; private String appName;
// 集群中所有机器的健康状态 // 集群中所有机器的健康状态
private Map<String, SystemMetrics> address2Metrics; private Map<String, SystemMetrics> address2Metrics;
// 集群中所有机器的容器部署状态
private Map<Long, List<DeployedContainerInfo>> containerId2Infos;
// 集群中所有机器的最后心跳时间 // 集群中所有机器的最后心跳时间
private Map<String, Long> address2ActiveTime; private Map<String, Long> address2ActiveTime;
@ -31,6 +35,7 @@ public class ClusterStatusHolder {
this.appName = appName; this.appName = appName;
address2Metrics = Maps.newConcurrentMap(); address2Metrics = Maps.newConcurrentMap();
address2ActiveTime = Maps.newConcurrentMap(); address2ActiveTime = Maps.newConcurrentMap();
containerId2Infos = Maps.newConcurrentMap();
} }
/** /**
@ -41,10 +46,23 @@ public class ClusterStatusHolder {
String workerAddress = heartbeat.getWorkerAddress(); String workerAddress = heartbeat.getWorkerAddress();
long heartbeatTime = heartbeat.getHeartbeatTime(); long heartbeatTime = heartbeat.getHeartbeatTime();
address2Metrics.put(workerAddress, heartbeat.getSystemMetrics());
Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L); Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L);
if (heartbeatTime > oldTime) { if (heartbeatTime < oldTime) {
address2ActiveTime.put(workerAddress, heartbeatTime); log.warn("[ClusterStatusHolder] receive the old heartbeat: {}.", heartbeat);
return;
}
address2ActiveTime.put(workerAddress, heartbeatTime);
address2Metrics.put(workerAddress, heartbeat.getSystemMetrics());
List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
if (!CollectionUtils.isEmpty(containerInfos)) {
containerInfos.forEach(containerInfo -> {
List<DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Lists.newLinkedList());
// 设置机器地址
containerInfo.setWorkerAddress(heartbeat.getWorkerAddress());
infos.add(containerInfo);
});
} }
} }
@ -97,6 +115,15 @@ public class ClusterStatusHolder {
return res; return res;
} }
/**
* 获取当前该Worker集群容器的部署情况
* @param containerId 容器ID
* @return 该容器的部署情况
*/
public List<DeployedContainerInfo> getDeployedContainerInfos(Long containerId) {
return containerId2Infos.getOrDefault(containerId, Lists.newLinkedList());
}
private boolean timeout(String address) { private boolean timeout(String address) {
// 排除超时机器 // 排除超时机器
Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L); Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.service.ha; package com.github.kfcfans.oms.server.service.ha;
import com.github.kfcfans.oms.common.model.DeployedContainerInfo;
import com.github.kfcfans.oms.common.model.SystemMetrics; import com.github.kfcfans.oms.common.model.SystemMetrics;
import com.github.kfcfans.oms.common.request.WorkerHeartbeat; import com.github.kfcfans.oms.common.request.WorkerHeartbeat;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -78,4 +79,18 @@ public class WorkerManagerService {
return clusterStatusHolder.getActiveWorkerInfo(); return clusterStatusHolder.getActiveWorkerInfo();
} }
/**
* 获取某个应用容器的部署情况
* @param appId 应用ID
* @param containerId 容器ID
* @return 部署情况
*/
public static List<DeployedContainerInfo> getDeployedContainerInfos(Long appId, Long containerId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
return Collections.emptyList();
}
return clusterStatusHolder.getDeployedContainerInfos(containerId);
}
} }

View File

@ -35,6 +35,8 @@ public class CleanService {
@Value("${oms.container.retention.remote}") @Value("${oms.container.retention.remote}")
private int remoteContainerRetentionDay; private int remoteContainerRetentionDay;
private static final int TEMPORARY_RETENTION_DAY = 3;
// 每天凌晨3点定时清理 // 每天凌晨3点定时清理
private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?";
@ -44,6 +46,7 @@ public class CleanService {
public void timingClean() { public void timingClean() {
cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay); cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay);
cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay);
cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY);
cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay); cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);

View File

@ -1,17 +1,22 @@
package com.github.kfcfans.oms.server.web.controller; package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.model.DeployedContainerInfo;
import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.common.utils.ContainerTemplateGenerator; import com.github.kfcfans.oms.server.common.utils.ContainerTemplateGenerator;
import com.github.kfcfans.oms.server.common.utils.OmsFileUtils; import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.oms.server.service.ContainerService; import com.github.kfcfans.oms.server.service.ContainerService;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.web.request.GenerateContainerTemplateRequest; import com.github.kfcfans.oms.server.web.request.GenerateContainerTemplateRequest;
import com.github.kfcfans.oms.server.web.request.SaveContainerInfoRequest; import com.github.kfcfans.oms.server.web.request.SaveContainerInfoRequest;
import com.github.kfcfans.oms.server.web.response.ContainerInfoVO; import com.github.kfcfans.oms.server.web.response.ContainerInfoVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
@ -32,11 +37,15 @@ import java.util.stream.Collectors;
@RequestMapping("/container") @RequestMapping("/container")
public class ContainerController { public class ContainerController {
@Resource @Value("${server.port}")
private ContainerInfoRepository containerInfoRepository; private int port;
@Resource @Resource
private ContainerService containerService; private ContainerService containerService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private ContainerInfoRepository containerInfoRepository;
@GetMapping("/downloadJar") @GetMapping("/downloadJar")
public void downloadJar(String version, HttpServletResponse response) throws IOException { public void downloadJar(String version, HttpServletResponse response) throws IOException {
@ -79,10 +88,22 @@ public class ContainerController {
} }
@GetMapping("/listDeployedWorker") @GetMapping("/listDeployedWorker")
public ResultDTO<List<String>> listDeployedWorker(Long appId, Long containerId) { public ResultDTO<List<DeployedContainerInfo>> listDeployedWorker(Long appId, Long containerId, HttpServletResponse response) {
// TODO本地 ContainerManager 直接返回 AppInfoDO appInfoDO = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find app by id:" + appId));
List<String> mock = Lists.newArrayList("192.168.1.1:9900", "192.168.1.1:9901"); String targetServer = appInfoDO.getCurrentServer();
return ResultDTO.success(mock);
// 转发 HTTP 请求
if (!OhMyServer.getActorSystemAddress().equals(targetServer)) {
String targetIp = targetServer.split(":")[0];
String url = String.format("http://%s:%d/container/listDeployedWorker?appId=%d&containerId=%d", targetIp, port, appId, containerId);
try {
response.sendRedirect(url);
return ResultDTO.success(null);
}catch (Exception e) {
return ResultDTO.failed(e);
}
}
return ResultDTO.success(WorkerManagerService.getDeployedContainerInfos(appId, containerId));
} }
private static ContainerInfoVO convert(ContainerInfoDO containerInfoDO) { private static ContainerInfoVO convert(ContainerInfoDO containerInfoDO) {

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.oms.common.request.WorkerHeartbeat;
import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils; import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils;
import com.github.kfcfans.oms.worker.container.OmsContainerFactory;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -39,6 +40,9 @@ public class WorkerHealthReporter implements Runnable {
heartbeat.setAppId(OhMyWorker.getAppId()); heartbeat.setAppId(OhMyWorker.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis()); heartbeat.setHeartbeatTime(System.currentTimeMillis());
// 获取当前加载的容器列表
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
// 发送请求 // 发送请求
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(serverPath)) {

View File

@ -17,6 +17,8 @@ public interface OmsContainer extends LifeCycle {
*/ */
BasicProcessor getProcessor(String className); BasicProcessor getProcessor(String className);
Long getContainerId();
Long getDeployedTime();
String getName(); String getName();
String getVersion(); String getVersion();

View File

@ -3,12 +3,14 @@ package com.github.kfcfans.oms.worker.container;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.github.kfcfans.oms.common.RemoteConstant; import com.github.kfcfans.oms.common.RemoteConstant;
import com.github.kfcfans.oms.common.model.DeployedContainerInfo;
import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest; import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.oms.common.response.AskResponse; import com.github.kfcfans.oms.common.response.AskResponse;
import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils; import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -17,6 +19,7 @@ import org.springframework.util.StringUtils;
import java.io.File; import java.io.File;
import java.net.URL; import java.net.URL;
import java.time.Duration; import java.time.Duration;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -96,7 +99,7 @@ public class OmsContainerFactory {
} }
// 创建新容器 // 创建新容器
OmsContainer newContainer = new OmsJarContainer(containerName, version, jarFile); OmsContainer newContainer = new OmsJarContainer(request.getContainerId(), containerName, version, jarFile);
newContainer.init(); newContainer.init();
// 替换容器 // 替换容器
@ -112,4 +115,14 @@ public class OmsContainerFactory {
log.error("[OmsContainerFactory] deploy container(name={},version={}) failed.", containerName, version, e); log.error("[OmsContainerFactory] deploy container(name={},version={}) failed.", containerName, version, e);
} }
} }
/**
* 获取该Worker已部署容器的信息
* @return 已部署容器信息
*/
public static List<DeployedContainerInfo> getDeployedContainerInfos() {
List<DeployedContainerInfo> info = Lists.newLinkedList();
CARGO.forEach((name, container) -> info.add(new DeployedContainerInfo(container.getContainerId(), container.getVersion(), container.getDeployedTime(), null)));
return info;
}
} }

View File

@ -26,9 +26,11 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
public class OmsJarContainer implements OmsContainer { public class OmsJarContainer implements OmsContainer {
private final Long containerId;
private final String name; private final String name;
private final String version; private final String version;
private final File localJarFile; private final File localJarFile;
private final Long deployedTime;
// 引用计数器 // 引用计数器
private final AtomicInteger referenceCount = new AtomicInteger(0); private final AtomicInteger referenceCount = new AtomicInteger(0);
@ -38,10 +40,12 @@ public class OmsJarContainer implements OmsContainer {
private Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap(); private Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap();
public OmsJarContainer(String name, String version, File localJarFile) { public OmsJarContainer(Long containerId, String name, String version, File localJarFile) {
this.containerId = containerId;
this.name = name; this.name = name;
this.version = version; this.version = version;
this.localJarFile = localJarFile; this.localJarFile = localJarFile;
this.deployedTime = System.currentTimeMillis();
} }
@Override @Override
@ -168,6 +172,14 @@ public class OmsJarContainer implements OmsContainer {
public String getVersion() { public String getVersion() {
return version; return version;
} }
@Override
public Long getContainerId() {
return containerId;
}
@Override
public Long getDeployedTime() {
return deployedTime;
}
@Override @Override
public void tryRelease() { public void tryRelease() {