[dev] use websocket to deploy container

This commit is contained in:
tjq 2020-05-17 22:24:40 +08:00
parent f6ba0783c2
commit 7247f41199
19 changed files with 349 additions and 129 deletions

View File

@ -16,6 +16,7 @@ public class RemoteConstant {
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
public static final String WORKER_ACTOR_NAME = "worker";
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf"; public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";

View File

@ -55,6 +55,11 @@
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>

View File

@ -68,4 +68,9 @@ public class OhMyServer {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME); String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME);
return actorSystem.actorSelection(path); return actorSystem.actorSelection(path);
} }
public static ActorSelection getWorkerActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
} }

View File

@ -0,0 +1,31 @@
package com.github.kfcfans.oms.server.common.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import javax.websocket.server.ServerEndpointConfig;
/**
* WebSocket 配置
* 解决 SpringBoot WebSocket 无法注入对象@Resource/@Autowired的问题
*
* @author tjq
* @since 2020/5/17
*/
@Component
public class OmsEndpointConfigure extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
private static volatile ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
return context.getBean(clazz);
}
}

View File

@ -3,8 +3,10 @@ package com.github.kfcfans.oms.server.common.config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -50,4 +52,14 @@ public class ThreadPoolConfig {
return executor; return executor;
} }
// 引入 WebSocket 支持后需要手动初始化调度线程池
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setThreadNamePrefix("omsSchedulerPool-");
scheduler.setDaemon(true);
return scheduler;
}
} }

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.oms.server.common.config; package com.github.kfcfans.oms.server.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry; import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/** /**
* CORS * CORS
@ -11,10 +14,16 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
* @since 2020/4/13 * @since 2020/4/13
*/ */
@Configuration @Configuration
@EnableWebSocket
public class WebConfig implements WebMvcConfigurer { public class WebConfig implements WebMvcConfigurer {
@Override @Override
public void addCorsMappings(CorsRegistry registry) { public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**") registry.addMapping("/**")
.allowedMethods("HEAD", "GET", "PUT", "POST", "DELETE", "PATCH"); .allowedMethods("HEAD", "GET", "PUT", "POST", "DELETE", "PATCH");
} }
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
} }

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.oms.server.common.utils; package com.github.kfcfans.oms.server.common.utils;
import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.data.mongodb.gridfs.GridFsResource; import org.springframework.data.mongodb.gridfs.GridFsResource;
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.util.DigestUtils;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.*; import java.io.*;
@ -98,4 +101,32 @@ public class OmsFileUtils {
ExceptionUtils.rethrow(ie); ExceptionUtils.rethrow(ie);
} }
} }
/**
* 将文件保存到 GridFS
* @param gridFsTemplate gridFS操作模版
* @param localFile 本地文件
* @param remoteName 存储名称
* @param metadata 元数据
* @throws IOException 异常
*/
public static void storeFile2GridFS(GridFsTemplate gridFsTemplate, File localFile, String remoteName, Object metadata) throws IOException {
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) {
gridFsTemplate.store(bis, remoteName, metadata);
}
}
/**
* 计算文件的 MD5
* @param f 文件
* @return md5
* @throws Exception 异常
*/
public static String md5(File f) throws Exception {
String md5;
try(FileInputStream fis = new FileInputStream(f)) {
md5 = DigestUtils.md5DigestAsHex(fis);
}
return md5;
}
} }

View File

@ -25,12 +25,16 @@ public class OmsLockDO {
private String lockName; private String lockName;
private String ownerIP; private String ownerIP;
// 最长持有锁的时间
private Long maxLockTime;
private Date gmtCreate; private Date gmtCreate;
private Date gmtModified; private Date gmtModified;
public OmsLockDO(String lockName, String ownerIP) { public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
this.lockName = lockName; this.lockName = lockName;
this.ownerIP = ownerIP; this.ownerIP = ownerIP;
this.maxLockTime = maxLockTime;
this.gmtCreate = new Date(); this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate; this.gmtModified = this.gmtCreate;
} }

View File

@ -21,10 +21,5 @@ public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true) @Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
int deleteByLockName(String lockName); int deleteByLockName(String lockName);
@Modifying
@Transactional
@Query(value = "delete from oms_lock where lock_name in ?1", nativeQuery = true)
int deleteByLockNames(List<String> lockNames);
OmsLockDO findByLockName(String lockName); OmsLockDO findByLockName(String lockName);
} }

View File

@ -1,12 +1,20 @@
package com.github.kfcfans.oms.server.service; package com.github.kfcfans.oms.server.service;
import akka.actor.ActorSelection;
import com.github.kfcfans.oms.common.model.GitRepoInfo; import com.github.kfcfans.oms.common.model.GitRepoInfo;
import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.common.utils.CommonUtils; import com.github.kfcfans.oms.common.utils.CommonUtils;
import com.github.kfcfans.oms.common.utils.JsonUtils; import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.NetUtils;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.actors.ServerActor;
import com.github.kfcfans.oms.server.common.constans.ContainerSourceType; import com.github.kfcfans.oms.server.common.constans.ContainerSourceType;
import com.github.kfcfans.oms.server.common.utils.OmsFileUtils; import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.oms.server.service.ha.ClusterStatusHolder;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.lock.LockService;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -18,6 +26,7 @@ import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.transport.CredentialsProvider; import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.gridfs.GridFsResource; import org.springframework.data.mongodb.gridfs.GridFsResource;
import org.springframework.data.mongodb.gridfs.GridFsTemplate; import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -25,10 +34,13 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.websocket.RemoteEndpoint;
import javax.websocket.Session;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* 容器服务 * 容器服务
@ -40,26 +52,33 @@ import java.util.Optional;
@Service @Service
public class ContainerService { public class ContainerService {
@Resource
private Environment environment;
@Resource
private LockService lockService;
@Resource @Resource
private ContainerInfoRepository containerInfoRepository; private ContainerInfoRepository containerInfoRepository;
private GridFsTemplate gridFsTemplate; private GridFsTemplate gridFsTemplate;
// 并发部署的机器数量
private static final int DEPLOY_BATCH_NUM = 50;
/** /**
* 获取构建容器所需要的 Jar 文件 * 获取构建容器所需要的 Jar 文件
* @param md5 Jar文件的MD5值可以由此构建 mongoDB 文件名 * @param filename 文件名称
* @return 本地Jar文件 * @return 本地Jar文件
*/ */
public File fetchContainerJarFile(String md5) { public File fetchContainerJarFile(String filename) {
String jarFileName = OmsFileUtils.genContainerJarPath() + genContainerJarName(md5); String jarFileName = OmsFileUtils.genContainerJarPath() + filename;
File jarFile = new File(jarFileName); File jarFile = new File(jarFileName);
if (jarFile.exists()) { if (jarFile.exists()) {
return jarFile; return jarFile;
} }
if (gridFsTemplate != null) { if (gridFsTemplate != null) {
downloadJarFromGridFS(genContainerJarName(md5), jarFile); downloadJarFromGridFS(filename, jarFile);
} }
return jarFile; return jarFile;
} }
@ -67,17 +86,158 @@ public class ContainerService {
/** /**
* 部署容器 * 部署容器
* @param containerName 容器名称 * @param containerName 容器名称
* @param session WebSocket Session
* @throws Exception 异常
*/ */
public void deploy(String containerName) throws Exception { public void deploy(String containerName, Session session) throws Exception {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findByContainerName(containerName);
ContainerInfoDO container = containerInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find container by name: "+ containerName));
String deployLock = "containerDeployLock-" + containerName;
RemoteEndpoint.Async remote = session.getAsyncRemote();
// 最长部署时间10分钟
boolean lock = lockService.lock(deployLock, 10 * 60 * 1000);
if (!lock) {
remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished.");
return;
}
try {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findByContainerName(containerName);
if (!containerInfoOpt.isPresent()) {
remote.sendText("SYSTEM: can't find container by name: " + containerName);
return;
}
ContainerInfoDO container = containerInfoOpt.get();
// 准备文件
File jarFile = prepareJarFile(container, session);
double sizeMB = 1.0 * jarFile.length() / FileUtils.ONE_MB;
remote.sendText(String.format("SYSTEM: the jarFile(size=%fMB) is prepared and ready to be deployed to the worker.", sizeMB));
// 开始部署需要分批进行
Set<String> workerAddressList = WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet();
if (workerAddressList.isEmpty()) {
remote.sendText("SYSTEM: there is no worker available now, deploy failed!");
return;
}
String port = environment.getProperty("local.server.port");
String downloadURL = String.format("http://%s:%s/container/downloadJar?filename=%s", NetUtils.getLocalHost(), port, jarFile.getName());
ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerName, container.getMd5(), downloadURL);
long sleepTime = calculateSleepTime(jarFile.length());
AtomicInteger count = new AtomicInteger();
workerAddressList.forEach(akkaAddress -> {
ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
workerActor.tell(req, null);
remote.sendText("SYSTEM: send deploy request to " + akkaAddress);
if (count.incrementAndGet() % DEPLOY_BATCH_NUM == 0) {
CommonUtils.executeIgnoreException(() -> Thread.sleep(sleepTime));
}
});
remote.sendText("SYSTEM: deploy finished, congratulations!");
}finally {
lockService.unlock(deployLock);
}
}
private File prepareJarFile(ContainerInfoDO container, Session session) throws Exception {
RemoteEndpoint.Async remote = session.getAsyncRemote();
// 获取JarGit需要先 clone成Jar计算MD5JarFile则直接下载 // 获取JarGit需要先 clone成Jar计算MD5JarFile则直接下载
ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType()); ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType());
if (sourceType == ContainerSourceType.Git) { if (sourceType == ContainerSourceType.Git) {
String workerDirStr = OmsFileUtils.genTemporaryPath();
File workerDir = new File(workerDirStr);
FileUtils.forceMkdir(workerDir);
// git clone
remote.sendText("SYSTEM: start to git clone the code repo, using config: " + container.getSourceInfo());
GitRepoInfo gitRepoInfo = JsonUtils.parseObject(container.getSourceInfo(), GitRepoInfo.class);
CloneCommand cloneCommand = Git.cloneRepository()
.setDirectory(workerDir)
.setURI(gitRepoInfo.getRepo())
.setBranch(gitRepoInfo.getBranch());
if (!StringUtils.isEmpty(gitRepoInfo.getUsername())) {
CredentialsProvider credentialsProvider = new UsernamePasswordCredentialsProvider(gitRepoInfo.getUsername(), gitRepoInfo.getPassword());
cloneCommand.setCredentialsProvider(credentialsProvider);
} }
cloneCommand.call();
// mvn clean package -DskipTests -U
remote.sendText("SYSTEM: git clone successfully, star to compile the project.");
Invoker mvnInvoker = new DefaultInvoker();
InvocationRequest ivkReq = new DefaultInvocationRequest();
ivkReq.setGoals(Lists.newArrayList("clean", "package", "-DskipTests", "-U"));
ivkReq.setBaseDirectory(workerDir);
ivkReq.setOutputHandler(remote::sendText);
InvocationResult mvnResult = mvnInvoker.execute(ivkReq);
if (mvnResult.getExitCode() != 0) {
throw mvnResult.getExecutionException();
}
String targetDirStr = workerDirStr + "/target";
File targetDir = new File(targetDirStr);
IOFileFilter fileFilter = FileFilterUtils.asFileFilter((dir, name) -> name.endsWith("jar-with-dependencies.jar"));
Collection<File> jarFile = FileUtils.listFiles(targetDir, fileFilter, null);
if (CollectionUtils.isEmpty(jarFile)) {
remote.sendText("SYSTEM: can't find packaged jar, deploy failed!");
throw new RuntimeException("can't find packaged jar");
}
File jarWithDependency = jarFile.iterator().next();
String md5 = OmsFileUtils.md5(jarWithDependency);
// 更新 MD5
container.setMd5(md5);
String jarFileName = genContainerJarName(md5);
GridFsResource resource = gridFsTemplate.getResource(jarFileName);
if (!resource.exists()) {
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
OmsFileUtils.storeFile2GridFS(gridFsTemplate, jarWithDependency, jarFileName, null);
remote.sendText("SYSTEM: upload to GridFS successfully~");
}
// 将文件从临时工作目录移动到正式目录
String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName;
File localFile = new File(localFileStr);
FileUtils.forceDelete(localFile);
FileUtils.copyFile(jarWithDependency, localFile);
// 删除工作区数据
FileUtils.forceDelete(workerDir);
return localFile;
}
// 先查询本地是否存在目标 Jar 文件
String jarFileName = genContainerJarName(container.getMd5());
String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName;
File localFile = new File(localFileStr);
if (localFile.exists()) {
remote.sendText("SYSTEM: find the jar file in local disk.");
return localFile;
}
// MongoDB 下载
GridFsResource resource = gridFsTemplate.getResource(jarFileName);
if (!resource.exists()) {
remote.sendText(String.format("SYSTEM: can't find %s in local disk and GridFS, deploy failed!", jarFileName));
throw new RuntimeException("can't find jar");
}
remote.sendText("SYSTEM: start to download jar file from GridFS......");
OmsFileUtils.gridFs2File(resource, localFile);
remote.sendText("SYSTEM: download jar file from GridFS successfully~");
return localFile;
} }
private void downloadJarFromGridFS(String mongoFileName, File targetFile) { private void downloadJarFromGridFS(String mongoFileName, File targetFile) {
@ -108,53 +268,12 @@ public class ContainerService {
this.gridFsTemplate = gridFsTemplate; this.gridFsTemplate = gridFsTemplate;
} }
public static void main(String[] args) throws Exception { /**
* 计算 sleep 时间每10M睡眠1S + 1
String gitRepoInfoStr = "{\"repo\":\"https://gitee.com/KFCFans/OhMyScheduler-Container-Template.git\",\"branch\":\"master\"}"; * @param fileLength 文件的字节数
* @return sleep 时间
String workerDirStr = OmsFileUtils.genTemporaryPath(); */
File workerDir = new File(workerDirStr); private long calculateSleepTime(long fileLength) {
FileUtils.forceMkdir(workerDir); return (fileLength / FileUtils.ONE_MB / 10 + 1) * 1000;
// git clone
GitRepoInfo gitRepoInfo = JsonUtils.parseObject(gitRepoInfoStr, GitRepoInfo.class);
CloneCommand cloneCommand = Git.cloneRepository()
.setDirectory(workerDir)
.setURI(gitRepoInfo.getRepo())
.setBranch(gitRepoInfo.getBranch());
if (!StringUtils.isEmpty(gitRepoInfo.getUsername())) {
CredentialsProvider credentialsProvider = new UsernamePasswordCredentialsProvider(gitRepoInfo.getUsername(), gitRepoInfo.getPassword());
cloneCommand.setCredentialsProvider(credentialsProvider);
}
cloneCommand.call();
// mvn clean package -DskipTests -U
Invoker mvnInvoker = new DefaultInvoker();
InvocationRequest ivkReq = new DefaultInvocationRequest();
ivkReq.setGoals(Lists.newArrayList("clean", "package", "-DskipTests", "-U"));
ivkReq.setBaseDirectory(workerDir);
ivkReq.setOutputHandler(line -> {
System.out.println(line);
});
InvocationResult mvnResult = mvnInvoker.execute(ivkReq);
if (mvnResult.getExitCode() != 0) {
// TODO输出失败信息
return;
}
String targetDirStr = workerDirStr + "/target";
File targetDir = new File(targetDirStr);
IOFileFilter fileFilter = FileFilterUtils.asFileFilter((dir, name) -> name.endsWith("jar-with-dependencies.jar"));
Collection<File> jarFile = FileUtils.listFiles(targetDir, fileFilter, null);
if (CollectionUtils.isEmpty(jarFile)) {
// TODO输出失败信息
return;
}
File jarWithDependency = jarFile.iterator().next();
} }
} }

View File

@ -65,7 +65,7 @@ public class ServerSelectService {
// 无可用Server重新进行Server选举需要加锁 // 无可用Server重新进行Server选举需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appId); String lockName = String.format(SERVER_ELECT_LOCK, appId);
boolean lockStatus = lockService.lock(lockName); boolean lockStatus = lockService.lock(lockName, 30000);
if (!lockStatus) { if (!lockStatus) {
try { try {
Thread.sleep(500); Thread.sleep(500);

View File

@ -30,14 +30,12 @@ public class DatabaseLockService implements LockService {
private Map<String, AtomicInteger> lockName2FailedTimes = Maps.newConcurrentMap(); private Map<String, AtomicInteger> lockName2FailedTimes = Maps.newConcurrentMap();
private static final int MAX_FAILED_NUM = 5; private static final int MAX_FAILED_NUM = 5;
// 最长持有锁30秒
private static final long LOCK_TIMEOUT_MS = 30000;
@Override @Override
public boolean lock(String name) { public boolean lock(String name, long maxLockTime) {
AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0)); AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0));
OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost()); OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime);
try { try {
omsLockRepository.saveAndFlush(newLock); omsLockRepository.saveAndFlush(newLock);
failedCount.set(0); failedCount.set(0);
@ -52,7 +50,7 @@ public class DatabaseLockService implements LockService {
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
if (lockedMillions > LOCK_TIMEOUT_MS) { if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO); log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO);
unlock(name); unlock(name);
@ -73,27 +71,4 @@ public class DatabaseLockService implements LockService {
} }
} }
@Override
public boolean batchLock(List<String> names) {
List<OmsLockDO> locks = Lists.newLinkedList();
names.forEach(name -> locks.add(new OmsLockDO(name, NetUtils.getLocalHost())));
try {
omsLockRepository.saveAll(locks);
omsLockRepository.flush();
return true;
}catch (DataIntegrityViolationException ignore) {
}catch (Exception e) {
log.warn("[DatabaseLockService] write locks to database failed, lockNames = {}.", names, e);
}
return false;
}
@Override
public void batchUnLock(List<String> names) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockNames(names));
}catch (Exception e) {
log.error("[DatabaseLockService] unlocks {} failed.", names, e);
}
}
} }

View File

@ -1,7 +1,5 @@
package com.github.kfcfans.oms.server.service.lock; package com.github.kfcfans.oms.server.service.lock;
import java.util.List;
/** /**
* 锁服务所有方法都不允许抛出任何异常 * 锁服务所有方法都不允许抛出任何异常
* *
@ -13,17 +11,14 @@ public interface LockService {
/** /**
* 上锁获取锁立即返回不会阻塞等待锁 * 上锁获取锁立即返回不会阻塞等待锁
* @param name 锁名称 * @param name 锁名称
* @param maxLockTime 最长持有锁的时间单位毫秒ms
* @return true -> 获取到锁false -> 未获取到锁 * @return true -> 获取到锁false -> 未获取到锁
*/ */
boolean lock(String name); boolean lock(String name, long maxLockTime);
/** /**
* 释放锁 * 释放锁
* @param name 锁名称 * @param name 锁名称
*/ */
void unlock(String name); void unlock(String name);
boolean batchLock(List<String> names);
void batchUnLock(List<String> names);
} }

View File

@ -94,6 +94,7 @@ public class InstanceLogCleanService {
try { try {
// 计算时间 // 计算时间
Date date = DateUtils.addDays(new Date(), -remoteRetentionDay); Date date = DateUtils.addDays(new Date(), -remoteRetentionDay);
// TODO破坏性太强会波及所有的文件
Query mongoQuery = Query.query(Criteria.where("uploadDate").lt(date)); Query mongoQuery = Query.query(Criteria.where("uploadDate").lt(date));
gridFsTemplate.delete(mongoQuery); gridFsTemplate.delete(mongoQuery);
}catch (Exception e) { }catch (Exception e) {

View File

@ -182,13 +182,13 @@ public class InstanceLogService {
File stableLogFile = genStableLogFile(instanceId); File stableLogFile = genStableLogFile(instanceId);
// 将文件推送到 MongoDB // 将文件推送到 MongoDB
if (gridFsTemplate != null) { if (gridFsTemplate != null) {
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(stableLogFile))) { try {
InstanceLogMetadata metadata = new InstanceLogMetadata(); InstanceLogMetadata metadata = new InstanceLogMetadata();
metadata.setInstanceId(instanceId); metadata.setInstanceId(instanceId);
metadata.setFileSize(stableLogFile.length()); metadata.setFileSize(stableLogFile.length());
metadata.setCreatedTime(System.currentTimeMillis()); metadata.setCreatedTime(System.currentTimeMillis());
gridFsTemplate.store(bis, genMongoFileName(instanceId), metadata);
OmsFileUtils.storeFile2GridFS(gridFsTemplate, stableLogFile, genMongoFileName(instanceId), metadata);
log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop()); log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e);

View File

@ -49,8 +49,8 @@ public class ContainerController {
private ContainerService containerService; private ContainerService containerService;
@GetMapping("/downloadJar") @GetMapping("/downloadJar")
public void downloadJar(String md5, HttpServletResponse response) throws IOException { public void downloadJar(String filename, HttpServletResponse response) throws IOException {
File file = containerService.fetchContainerJarFile(md5); File file = containerService.fetchContainerJarFile(filename);
if (file.exists()) { if (file.exists()) {
OmsFileUtils.file2HttpResponse(file, response); OmsFileUtils.file2HttpResponse(file, response);
} }
@ -81,10 +81,7 @@ public class ContainerController {
// 2. 检查是否符合标准是否为Jar是否符合 template // 2. 检查是否符合标准是否为Jar是否符合 template
// 3. 生成MD5 // 3. 生成MD5
String md5; String md5 = OmsFileUtils.md5(jarFile);
try(FileInputStream fis = new FileInputStream(jarFile)) {
md5 = DigestUtils.md5DigestAsHex(fis);
}
// 3. 推送到 mongoDB // 3. 推送到 mongoDB
if (gridFsTemplate != null) { if (gridFsTemplate != null) {
@ -126,12 +123,6 @@ public class ContainerController {
return ResultDTO.success(res); return ResultDTO.success(res);
} }
@GetMapping("/deploy")
public ResultDTO<Void> deploy(Long containerId) {
// TODO最好支持显示阶段需要问问FN怎么搞
return ResultDTO.success(null);
}
@GetMapping("/listDeployedWorker") @GetMapping("/listDeployedWorker")
public ResultDTO<List<String>> listDeployedWorker(Long appId, Long containerId) { public ResultDTO<List<String>> listDeployedWorker(Long appId, Long containerId) {
// TODO本地 ContainerManager 直接返回 // TODO本地 ContainerManager 直接返回

View File

@ -0,0 +1,59 @@
package com.github.kfcfans.oms.server.web.websocket;
import com.github.kfcfans.oms.server.common.config.OmsEndpointConfigure;
import com.github.kfcfans.oms.server.service.ContainerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
/**
* 容器部署 WebSocket 服务
* 记录一个不错的 WebSocket 测试网站<a>http://www.easyswoole.com/wstool.html</a>
*
* @author tjq
* @since 2020/5/17
*/
@Slf4j
@Component
@ServerEndpoint(value = "/container/deploy/{name}", configurator = OmsEndpointConfigure.class)
public class ContainerDeployServerEndpoint {
@Resource
private ContainerService containerService;
@OnOpen
public void onOpen(@PathParam("name") String name, Session session) {
RemoteEndpoint.Async remote = session.getAsyncRemote();
remote.sendText("SYSTEM: connected successfully, start to deploy container: " + name);
try {
containerService.deploy(name, session);
}catch (Exception e) {
log.error("[ContainerDeployServerEndpoint] deploy container {} failed.", name, e);
remote.sendText("SYSTEM: deploy failed because of the exception");
remote.sendText(ExceptionUtils.getStackTrace(e));
}
try {
session.close();
}catch (Exception e) {
log.error("[ContainerDeployServerEndpoint] close session for {} failed.", name, e);
}
}
@OnError
public void onError(Session session, Throwable throwable) {
try {
session.close();
} catch (IOException e) {
log.error("[ContainerDeployServerEndpoint] close session failed.", e);
}
log.warn("[ContainerDeployServerEndpoint] session onError!", throwable);
}
}

View File

@ -45,7 +45,7 @@ public class RepositoryTest {
List<OmsLockDO> locks = Lists.newArrayList(); List<OmsLockDO> locks = Lists.newArrayList();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost()); OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost(), 10000L);
locks.add(lockDO); locks.add(lockDO);
} }
omsLockRepository.saveAll(locks); omsLockRepository.saveAll(locks);

View File

@ -32,24 +32,11 @@ public class ServiceTest {
public void testLockService() { public void testLockService() {
String lockName = "myLock"; String lockName = "myLock";
lockService.lock(lockName); lockService.lock(lockName, 10000);
lockService.lock(lockName); lockService.lock(lockName, 10000);
lockService.unlock(lockName); lockService.unlock(lockName);
} }
@Test
public void testBatchLock() {
List<String> lockNames = Lists.newArrayList("a", "b", "C", "d", "e");
System.out.println(lockService.batchLock(lockNames));
System.out.println(lockService.batchLock(lockNames));
}
@Test
public void testBatchUnLock() {
List<String> lockNames = Lists.newArrayList("a", "b", "C", "d", "e");
lockService.batchUnLock(lockNames);
}
@Test @Test
public void testIdGenerator() { public void testIdGenerator() {
System.out.println(idGenerateService.allocate()); System.out.println(idGenerateService.allocate());