diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/RemoteConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/RemoteConstant.java
index 9989bc5b..f2cdcc84 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/RemoteConstant.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/RemoteConstant.java
@@ -16,6 +16,7 @@ public class RemoteConstant {
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
+ public static final String WORKER_ACTOR_NAME = "worker";
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml
index 1c8fc31f..f39fb1b3 100644
--- a/oh-my-scheduler-server/pom.xml
+++ b/oh-my-scheduler-server/pom.xml
@@ -55,6 +55,11 @@
spring-boot-starter-web
${springboot.version}
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+ ${springboot.version}
+
org.springframework.boot
spring-boot-starter-data-jpa
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java
index a5e8034d..2a929e90 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java
@@ -68,4 +68,9 @@ public class OhMyServer {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
+
+ public static ActorSelection getWorkerActor(String address) {
+ String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME);
+ return actorSystem.actorSelection(path);
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/OmsEndpointConfigure.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/OmsEndpointConfigure.java
new file mode 100644
index 00000000..d41676b5
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/OmsEndpointConfigure.java
@@ -0,0 +1,31 @@
+package com.github.kfcfans.oms.server.common.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.server.ServerEndpointConfig;
+
+/**
+ * WebSocket 配置
+ * 解决 SpringBoot WebSocket 无法注入对象(@Resource/@Autowired)的问题
+ *
+ * @author tjq
+ * @since 2020/5/17
+ */
+@Component
+public class OmsEndpointConfigure extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
+
+ private static volatile ApplicationContext context;
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ context = applicationContext;
+ }
+
+ @Override
+ public T getEndpointInstance(Class clazz) throws InstantiationException {
+ return context.getBean(clazz);
+ }
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java
index e718cd2d..5d047c37 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/ThreadPoolConfig.java
@@ -3,8 +3,10 @@ package com.github.kfcfans.oms.server.common.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -50,4 +52,14 @@ public class ThreadPoolConfig {
return executor;
}
+ // 引入 WebSocket 支持后需要手动初始化调度线程池
+ @Bean
+ public TaskScheduler taskScheduler() {
+ ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+ scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
+ scheduler.setThreadNamePrefix("omsSchedulerPool-");
+ scheduler.setDaemon(true);
+ return scheduler;
+ }
+
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/WebConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/WebConfig.java
index f4254ff8..d3c221e3 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/WebConfig.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/WebConfig.java
@@ -1,8 +1,11 @@
package com.github.kfcfans.oms.server.common.config;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* CORS
@@ -11,10 +14,16 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
* @since 2020/4/13
*/
@Configuration
+@EnableWebSocket
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedMethods("HEAD", "GET", "PUT", "POST", "DELETE", "PATCH");
}
+
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter() {
+ return new ServerEndpointExporter();
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java
index a179f72e..948fddef 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java
@@ -1,8 +1,11 @@
package com.github.kfcfans.oms.server.common.utils;
+import com.github.kfcfans.oms.server.persistence.mongodb.InstanceLogMetadata;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.data.mongodb.gridfs.GridFsResource;
+import org.springframework.data.mongodb.gridfs.GridFsTemplate;
+import org.springframework.util.DigestUtils;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
@@ -98,4 +101,32 @@ public class OmsFileUtils {
ExceptionUtils.rethrow(ie);
}
}
+
+ /**
+ * 将文件保存到 GridFS
+ * @param gridFsTemplate gridFS操作模版
+ * @param localFile 本地文件
+ * @param remoteName 存储名称
+ * @param metadata 元数据
+ * @throws IOException 异常
+ */
+ public static void storeFile2GridFS(GridFsTemplate gridFsTemplate, File localFile, String remoteName, Object metadata) throws IOException {
+ try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) {
+ gridFsTemplate.store(bis, remoteName, metadata);
+ }
+ }
+
+ /**
+ * 计算文件的 MD5
+ * @param f 文件
+ * @return md5
+ * @throws Exception 异常
+ */
+ public static String md5(File f) throws Exception {
+ String md5;
+ try(FileInputStream fis = new FileInputStream(f)) {
+ md5 = DigestUtils.md5DigestAsHex(fis);
+ }
+ return md5;
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/OmsLockDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/OmsLockDO.java
index e96f51d2..65b90bf8 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/OmsLockDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/OmsLockDO.java
@@ -25,12 +25,16 @@ public class OmsLockDO {
private String lockName;
private String ownerIP;
+ // 最长持有锁的时间
+ private Long maxLockTime;
+
private Date gmtCreate;
private Date gmtModified;
- public OmsLockDO(String lockName, String ownerIP) {
+ public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
this.lockName = lockName;
this.ownerIP = ownerIP;
+ this.maxLockTime = maxLockTime;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/OmsLockRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/OmsLockRepository.java
index 2e7a1ae7..f9f8b58d 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/OmsLockRepository.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/OmsLockRepository.java
@@ -21,10 +21,5 @@ public interface OmsLockRepository extends JpaRepository {
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
int deleteByLockName(String lockName);
- @Modifying
- @Transactional
- @Query(value = "delete from oms_lock where lock_name in ?1", nativeQuery = true)
- int deleteByLockNames(List lockNames);
-
OmsLockDO findByLockName(String lockName);
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java
index 6cfda610..b69c2a3b 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java
@@ -1,12 +1,20 @@
package com.github.kfcfans.oms.server.service;
+import akka.actor.ActorSelection;
import com.github.kfcfans.oms.common.model.GitRepoInfo;
+import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.common.utils.CommonUtils;
import com.github.kfcfans.oms.common.utils.JsonUtils;
+import com.github.kfcfans.oms.common.utils.NetUtils;
+import com.github.kfcfans.oms.server.akka.OhMyServer;
+import com.github.kfcfans.oms.server.akka.actors.ServerActor;
import com.github.kfcfans.oms.server.common.constans.ContainerSourceType;
import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
+import com.github.kfcfans.oms.server.service.ha.ClusterStatusHolder;
+import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
+import com.github.kfcfans.oms.server.service.lock.LockService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
@@ -18,6 +26,7 @@ import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.gridfs.GridFsResource;
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.stereotype.Service;
@@ -25,10 +34,13 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.Session;
import java.io.File;
-import java.io.FilenameFilter;
import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* 容器服务
@@ -40,26 +52,33 @@ import java.util.Optional;
@Service
public class ContainerService {
+ @Resource
+ private Environment environment;
+ @Resource
+ private LockService lockService;
@Resource
private ContainerInfoRepository containerInfoRepository;
private GridFsTemplate gridFsTemplate;
+ // 并发部署的机器数量
+ private static final int DEPLOY_BATCH_NUM = 50;
+
/**
* 获取构建容器所需要的 Jar 文件
- * @param md5 Jar文件的MD5值,可以由此构建 mongoDB 文件名
+ * @param filename 文件名称
* @return 本地Jar文件
*/
- public File fetchContainerJarFile(String md5) {
+ public File fetchContainerJarFile(String filename) {
- String jarFileName = OmsFileUtils.genContainerJarPath() + genContainerJarName(md5);
+ String jarFileName = OmsFileUtils.genContainerJarPath() + filename;
File jarFile = new File(jarFileName);
if (jarFile.exists()) {
return jarFile;
}
if (gridFsTemplate != null) {
- downloadJarFromGridFS(genContainerJarName(md5), jarFile);
+ downloadJarFromGridFS(filename, jarFile);
}
return jarFile;
}
@@ -67,17 +86,158 @@ public class ContainerService {
/**
* 部署容器
* @param containerName 容器名称
+ * @param session WebSocket Session
+ * @throws Exception 异常
*/
- public void deploy(String containerName) throws Exception {
- Optional containerInfoOpt = containerInfoRepository.findByContainerName(containerName);
- ContainerInfoDO container = containerInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find container by name: "+ containerName));
+ public void deploy(String containerName, Session session) throws Exception {
+ String deployLock = "containerDeployLock-" + containerName;
+ RemoteEndpoint.Async remote = session.getAsyncRemote();
+ // 最长部署时间:10分钟
+ boolean lock = lockService.lock(deployLock, 10 * 60 * 1000);
+ if (!lock) {
+ remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished.");
+ return;
+ }
+
+ try {
+
+ Optional containerInfoOpt = containerInfoRepository.findByContainerName(containerName);
+ if (!containerInfoOpt.isPresent()) {
+ remote.sendText("SYSTEM: can't find container by name: " + containerName);
+ return;
+ }
+ ContainerInfoDO container = containerInfoOpt.get();
+
+ // 准备文件
+ File jarFile = prepareJarFile(container, session);
+ double sizeMB = 1.0 * jarFile.length() / FileUtils.ONE_MB;
+ remote.sendText(String.format("SYSTEM: the jarFile(size=%fMB) is prepared and ready to be deployed to the worker.", sizeMB));
+
+ // 开始部署(需要分批进行)
+ Set workerAddressList = WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet();
+ if (workerAddressList.isEmpty()) {
+ remote.sendText("SYSTEM: there is no worker available now, deploy failed!");
+ return;
+ }
+
+ String port = environment.getProperty("local.server.port");
+ String downloadURL = String.format("http://%s:%s/container/downloadJar?filename=%s", NetUtils.getLocalHost(), port, jarFile.getName());
+ ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerName, container.getMd5(), downloadURL);
+ long sleepTime = calculateSleepTime(jarFile.length());
+
+ AtomicInteger count = new AtomicInteger();
+ workerAddressList.forEach(akkaAddress -> {
+ ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
+ workerActor.tell(req, null);
+
+ remote.sendText("SYSTEM: send deploy request to " + akkaAddress);
+
+ if (count.incrementAndGet() % DEPLOY_BATCH_NUM == 0) {
+ CommonUtils.executeIgnoreException(() -> Thread.sleep(sleepTime));
+ }
+ });
+
+ remote.sendText("SYSTEM: deploy finished, congratulations!");
+
+ }finally {
+ lockService.unlock(deployLock);
+ }
+ }
+
+ private File prepareJarFile(ContainerInfoDO container, Session session) throws Exception {
+
+ RemoteEndpoint.Async remote = session.getAsyncRemote();
// 获取Jar,Git需要先 clone成Jar计算MD5,JarFile则直接下载
ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType());
if (sourceType == ContainerSourceType.Git) {
+ String workerDirStr = OmsFileUtils.genTemporaryPath();
+ File workerDir = new File(workerDirStr);
+ FileUtils.forceMkdir(workerDir);
+ // git clone
+ remote.sendText("SYSTEM: start to git clone the code repo, using config: " + container.getSourceInfo());
+ GitRepoInfo gitRepoInfo = JsonUtils.parseObject(container.getSourceInfo(), GitRepoInfo.class);
+
+ CloneCommand cloneCommand = Git.cloneRepository()
+ .setDirectory(workerDir)
+ .setURI(gitRepoInfo.getRepo())
+ .setBranch(gitRepoInfo.getBranch());
+ if (!StringUtils.isEmpty(gitRepoInfo.getUsername())) {
+ CredentialsProvider credentialsProvider = new UsernamePasswordCredentialsProvider(gitRepoInfo.getUsername(), gitRepoInfo.getPassword());
+ cloneCommand.setCredentialsProvider(credentialsProvider);
+ }
+ cloneCommand.call();
+
+ // mvn clean package -DskipTests -U
+ remote.sendText("SYSTEM: git clone successfully, star to compile the project.");
+ Invoker mvnInvoker = new DefaultInvoker();
+ InvocationRequest ivkReq = new DefaultInvocationRequest();
+ ivkReq.setGoals(Lists.newArrayList("clean", "package", "-DskipTests", "-U"));
+ ivkReq.setBaseDirectory(workerDir);
+ ivkReq.setOutputHandler(remote::sendText);
+
+ InvocationResult mvnResult = mvnInvoker.execute(ivkReq);
+ if (mvnResult.getExitCode() != 0) {
+ throw mvnResult.getExecutionException();
+ }
+
+ String targetDirStr = workerDirStr + "/target";
+ File targetDir = new File(targetDirStr);
+ IOFileFilter fileFilter = FileFilterUtils.asFileFilter((dir, name) -> name.endsWith("jar-with-dependencies.jar"));
+ Collection jarFile = FileUtils.listFiles(targetDir, fileFilter, null);
+
+ if (CollectionUtils.isEmpty(jarFile)) {
+ remote.sendText("SYSTEM: can't find packaged jar, deploy failed!");
+ throw new RuntimeException("can't find packaged jar");
+ }
+
+ File jarWithDependency = jarFile.iterator().next();
+ String md5 = OmsFileUtils.md5(jarWithDependency);
+ // 更新 MD5
+ container.setMd5(md5);
+
+ String jarFileName = genContainerJarName(md5);
+ GridFsResource resource = gridFsTemplate.getResource(jarFileName);
+
+ if (!resource.exists()) {
+ remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
+ OmsFileUtils.storeFile2GridFS(gridFsTemplate, jarWithDependency, jarFileName, null);
+ remote.sendText("SYSTEM: upload to GridFS successfully~");
+ }
+
+ // 将文件从临时工作目录移动到正式目录
+ String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName;
+ File localFile = new File(localFileStr);
+ FileUtils.forceDelete(localFile);
+ FileUtils.copyFile(jarWithDependency, localFile);
+
+ // 删除工作区数据
+ FileUtils.forceDelete(workerDir);
+
+ return localFile;
}
+
+ // 先查询本地是否存在目标 Jar 文件
+ String jarFileName = genContainerJarName(container.getMd5());
+ String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName;
+ File localFile = new File(localFileStr);
+ if (localFile.exists()) {
+ remote.sendText("SYSTEM: find the jar file in local disk.");
+ return localFile;
+ }
+
+ // 从 MongoDB 下载
+ GridFsResource resource = gridFsTemplate.getResource(jarFileName);
+ if (!resource.exists()) {
+ remote.sendText(String.format("SYSTEM: can't find %s in local disk and GridFS, deploy failed!", jarFileName));
+ throw new RuntimeException("can't find jar");
+ }
+ remote.sendText("SYSTEM: start to download jar file from GridFS......");
+ OmsFileUtils.gridFs2File(resource, localFile);
+ remote.sendText("SYSTEM: download jar file from GridFS successfully~");
+ return localFile;
}
private void downloadJarFromGridFS(String mongoFileName, File targetFile) {
@@ -108,53 +268,12 @@ public class ContainerService {
this.gridFsTemplate = gridFsTemplate;
}
- public static void main(String[] args) throws Exception {
-
- String gitRepoInfoStr = "{\"repo\":\"https://gitee.com/KFCFans/OhMyScheduler-Container-Template.git\",\"branch\":\"master\"}";
-
- String workerDirStr = OmsFileUtils.genTemporaryPath();
- File workerDir = new File(workerDirStr);
- FileUtils.forceMkdir(workerDir);
-
- // git clone
- GitRepoInfo gitRepoInfo = JsonUtils.parseObject(gitRepoInfoStr, GitRepoInfo.class);
-
- CloneCommand cloneCommand = Git.cloneRepository()
- .setDirectory(workerDir)
- .setURI(gitRepoInfo.getRepo())
- .setBranch(gitRepoInfo.getBranch());
- if (!StringUtils.isEmpty(gitRepoInfo.getUsername())) {
- CredentialsProvider credentialsProvider = new UsernamePasswordCredentialsProvider(gitRepoInfo.getUsername(), gitRepoInfo.getPassword());
- cloneCommand.setCredentialsProvider(credentialsProvider);
- }
- cloneCommand.call();
-
- // mvn clean package -DskipTests -U
- Invoker mvnInvoker = new DefaultInvoker();
- InvocationRequest ivkReq = new DefaultInvocationRequest();
- ivkReq.setGoals(Lists.newArrayList("clean", "package", "-DskipTests", "-U"));
- ivkReq.setBaseDirectory(workerDir);
- ivkReq.setOutputHandler(line -> {
- System.out.println(line);
- });
-
- InvocationResult mvnResult = mvnInvoker.execute(ivkReq);
- if (mvnResult.getExitCode() != 0) {
- // TODO:输出失败信息
- return;
- }
-
- String targetDirStr = workerDirStr + "/target";
- File targetDir = new File(targetDirStr);
- IOFileFilter fileFilter = FileFilterUtils.asFileFilter((dir, name) -> name.endsWith("jar-with-dependencies.jar"));
- Collection jarFile = FileUtils.listFiles(targetDir, fileFilter, null);
-
- if (CollectionUtils.isEmpty(jarFile)) {
- // TODO:输出失败信息
- return;
- }
-
- File jarWithDependency = jarFile.iterator().next();
-
+ /**
+ * 计算 sleep 时间(每10M睡眠1S + 1)
+ * @param fileLength 文件的字节数
+ * @return sleep 时间
+ */
+ private long calculateSleepTime(long fileLength) {
+ return (fileLength / FileUtils.ONE_MB / 10 + 1) * 1000;
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
index 9c1b329f..684facd5 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
@@ -65,7 +65,7 @@ public class ServerSelectService {
// 无可用Server,重新进行Server选举,需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appId);
- boolean lockStatus = lockService.lock(lockName);
+ boolean lockStatus = lockService.lock(lockName, 30000);
if (!lockStatus) {
try {
Thread.sleep(500);
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java
index 227d82ff..3f73cd2d 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/DatabaseLockService.java
@@ -30,14 +30,12 @@ public class DatabaseLockService implements LockService {
private Map lockName2FailedTimes = Maps.newConcurrentMap();
private static final int MAX_FAILED_NUM = 5;
- // 最长持有锁30秒
- private static final long LOCK_TIMEOUT_MS = 30000;
@Override
- public boolean lock(String name) {
+ public boolean lock(String name, long maxLockTime) {
AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0));
- OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost());
+ OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime);
try {
omsLockRepository.saveAndFlush(newLock);
failedCount.set(0);
@@ -52,7 +50,7 @@ public class DatabaseLockService implements LockService {
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
- if (lockedMillions > LOCK_TIMEOUT_MS) {
+ if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO);
unlock(name);
@@ -73,27 +71,4 @@ public class DatabaseLockService implements LockService {
}
}
- @Override
- public boolean batchLock(List names) {
- List locks = Lists.newLinkedList();
- names.forEach(name -> locks.add(new OmsLockDO(name, NetUtils.getLocalHost())));
- try {
- omsLockRepository.saveAll(locks);
- omsLockRepository.flush();
- return true;
- }catch (DataIntegrityViolationException ignore) {
- }catch (Exception e) {
- log.warn("[DatabaseLockService] write locks to database failed, lockNames = {}.", names, e);
- }
- return false;
- }
-
- @Override
- public void batchUnLock(List names) {
- try {
- CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockNames(names));
- }catch (Exception e) {
- log.error("[DatabaseLockService] unlocks {} failed.", names, e);
- }
- }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java
index 401003e7..3a1e19a9 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/lock/LockService.java
@@ -1,7 +1,5 @@
package com.github.kfcfans.oms.server.service.lock;
-import java.util.List;
-
/**
* 锁服务,所有方法都不允许抛出任何异常!
*
@@ -13,17 +11,14 @@ public interface LockService {
/**
* 上锁(获取锁),立即返回,不会阻塞等待锁
* @param name 锁名称
+ * @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
* @return true -> 获取到锁,false -> 未获取到锁
*/
- boolean lock(String name);
+ boolean lock(String name, long maxLockTime);
/**
* 释放锁
* @param name 锁名称
*/
void unlock(String name);
-
- boolean batchLock(List names);
- void batchUnLock(List names);
-
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogCleanService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogCleanService.java
index a196f061..4273a5d2 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogCleanService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogCleanService.java
@@ -94,6 +94,7 @@ public class InstanceLogCleanService {
try {
// 计算时间
Date date = DateUtils.addDays(new Date(), -remoteRetentionDay);
+ // TODO:破坏性太强,会波及所有的文件
Query mongoQuery = Query.query(Criteria.where("uploadDate").lt(date));
gridFsTemplate.delete(mongoQuery);
}catch (Exception e) {
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java
index ed38c68f..08d588f8 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java
@@ -182,13 +182,13 @@ public class InstanceLogService {
File stableLogFile = genStableLogFile(instanceId);
// 将文件推送到 MongoDB
if (gridFsTemplate != null) {
- try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(stableLogFile))) {
-
+ try {
InstanceLogMetadata metadata = new InstanceLogMetadata();
metadata.setInstanceId(instanceId);
metadata.setFileSize(stableLogFile.length());
metadata.setCreatedTime(System.currentTimeMillis());
- gridFsTemplate.store(bis, genMongoFileName(instanceId), metadata);
+
+ OmsFileUtils.storeFile2GridFS(gridFsTemplate, stableLogFile, genMongoFileName(instanceId), metadata);
log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) {
log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e);
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java
index 65f3c794..91955263 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java
@@ -49,8 +49,8 @@ public class ContainerController {
private ContainerService containerService;
@GetMapping("/downloadJar")
- public void downloadJar(String md5, HttpServletResponse response) throws IOException {
- File file = containerService.fetchContainerJarFile(md5);
+ public void downloadJar(String filename, HttpServletResponse response) throws IOException {
+ File file = containerService.fetchContainerJarFile(filename);
if (file.exists()) {
OmsFileUtils.file2HttpResponse(file, response);
}
@@ -81,10 +81,7 @@ public class ContainerController {
// 2. 检查是否符合标准(是否为Jar,是否符合 template)
// 3. 生成MD5
- String md5;
- try(FileInputStream fis = new FileInputStream(jarFile)) {
- md5 = DigestUtils.md5DigestAsHex(fis);
- }
+ String md5 = OmsFileUtils.md5(jarFile);
// 3. 推送到 mongoDB
if (gridFsTemplate != null) {
@@ -126,12 +123,6 @@ public class ContainerController {
return ResultDTO.success(res);
}
- @GetMapping("/deploy")
- public ResultDTO deploy(Long containerId) {
- // TODO:最好支持显示阶段,需要问问FN怎么搞
- return ResultDTO.success(null);
- }
-
@GetMapping("/listDeployedWorker")
public ResultDTO> listDeployedWorker(Long appId, Long containerId) {
// TODO:本地 ContainerManager 直接返回
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java
new file mode 100644
index 00000000..bf0ec691
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java
@@ -0,0 +1,59 @@
+package com.github.kfcfans.oms.server.web.websocket;
+
+import com.github.kfcfans.oms.server.common.config.OmsEndpointConfigure;
+import com.github.kfcfans.oms.server.service.ContainerService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+
+/**
+ * 容器部署 WebSocket 服务
+ * 记录一个不错的 WebSocket 测试网站:http://www.easyswoole.com/wstool.html
+ *
+ * @author tjq
+ * @since 2020/5/17
+ */
+@Slf4j
+@Component
+@ServerEndpoint(value = "/container/deploy/{name}", configurator = OmsEndpointConfigure.class)
+public class ContainerDeployServerEndpoint {
+
+ @Resource
+ private ContainerService containerService;
+
+ @OnOpen
+ public void onOpen(@PathParam("name") String name, Session session) {
+
+ RemoteEndpoint.Async remote = session.getAsyncRemote();
+ remote.sendText("SYSTEM: connected successfully, start to deploy container: " + name);
+ try {
+ containerService.deploy(name, session);
+ }catch (Exception e) {
+ log.error("[ContainerDeployServerEndpoint] deploy container {} failed.", name, e);
+
+ remote.sendText("SYSTEM: deploy failed because of the exception");
+ remote.sendText(ExceptionUtils.getStackTrace(e));
+ }
+ try {
+ session.close();
+ }catch (Exception e) {
+ log.error("[ContainerDeployServerEndpoint] close session for {} failed.", name, e);
+ }
+ }
+
+ @OnError
+ public void onError(Session session, Throwable throwable) {
+ try {
+ session.close();
+ } catch (IOException e) {
+ log.error("[ContainerDeployServerEndpoint] close session failed.", e);
+ }
+ log.warn("[ContainerDeployServerEndpoint] session onError!", throwable);
+ }
+}
diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java
index 4ab3b498..6ea58fdc 100644
--- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java
+++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java
@@ -45,7 +45,7 @@ public class RepositoryTest {
List locks = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
- OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost());
+ OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost(), 10000L);
locks.add(lockDO);
}
omsLockRepository.saveAll(locks);
diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java
index 47ab56d6..2f6862b6 100644
--- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java
+++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/ServiceTest.java
@@ -32,24 +32,11 @@ public class ServiceTest {
public void testLockService() {
String lockName = "myLock";
- lockService.lock(lockName);
- lockService.lock(lockName);
+ lockService.lock(lockName, 10000);
+ lockService.lock(lockName, 10000);
lockService.unlock(lockName);
}
- @Test
- public void testBatchLock() {
- List lockNames = Lists.newArrayList("a", "b", "C", "d", "e");
- System.out.println(lockService.batchLock(lockNames));
- System.out.println(lockService.batchLock(lockNames));
- }
-
- @Test
- public void testBatchUnLock() {
- List lockNames = Lists.newArrayList("a", "b", "C", "d", "e");
- lockService.batchUnLock(lockNames);
- }
-
@Test
public void testIdGenerator() {
System.out.println(idGenerateService.allocate());