diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml
index d9d6acf4..3ceae542 100644
--- a/oh-my-scheduler-common/pom.xml
+++ b/oh-my-scheduler-common/pom.xml
@@ -16,6 +16,7 @@
1.7.30
3.10
+ 2.6
29.0-jre
4.4.1
2.6.4
@@ -61,6 +62,13 @@
akka-serialization-jackson_2.13
${akka.version}
+
+
+
+ commons-io
+ commons-io
+ ${commons.io.version}
+
\ No newline at end of file
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java
index a2db9ab5..98e3ba5c 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java
@@ -15,7 +15,8 @@ public enum ProcessorType {
EMBEDDED_JAVA(1, "内置JAVA处理器"),
SHELL(2, "SHELL脚本"),
- PYTHON(3, "Python脚本");
+ PYTHON(3, "Python脚本"),
+ JAVA_CONTAINER(4, "Java容器");
private int v;
private String des;
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java
new file mode 100644
index 00000000..ace2fe8f
--- /dev/null
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java
@@ -0,0 +1,31 @@
+package com.github.kfcfans.oms.common.request;
+
+import com.github.kfcfans.oms.common.OmsSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Worker部署Container请求
+ *
+ * @author tjq
+ * @since 2020/5/16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ServerDeployContainerRequest implements OmsSerializable {
+
+ /**
+ * 容器名称
+ */
+ private String containerName;
+ /**
+ * 文件名(MD5值),用于做版本校验和文件下载
+ */
+ private String md5;
+ /**
+ * 下载地址
+ */
+ private String downloadURL;
+}
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java
new file mode 100644
index 00000000..41d4a5d0
--- /dev/null
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java
@@ -0,0 +1,19 @@
+package com.github.kfcfans.oms.common.request.http;
+
+import com.github.kfcfans.oms.common.OmsSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Worker需要部署容器,主动向Server请求信息
+ *
+ * @author tjq
+ * @since 2020/5/16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkerNeedDeployContainerRequest implements OmsSerializable {
+ private String containerName;
+}
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java
index c4d4dcbc..8e155065 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java
@@ -3,6 +3,7 @@ package com.github.kfcfans.oms.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.Collection;
+import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -88,4 +89,11 @@ public class CommonUtils {
}catch (Exception ignore) {
}
}
+
+ public static void executeIgnoreException(Meaningless executor) {
+ try {
+ executor.m();
+ }catch (Exception ignore) {
+ }
+ }
}
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java
new file mode 100644
index 00000000..ebe62ed5
--- /dev/null
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java
@@ -0,0 +1,12 @@
+package com.github.kfcfans.oms.common.utils;
+
+/**
+ * 毫无意义就是最大的意义
+ *
+ * @author tjq
+ * @since 2020/5/16
+ */
+@FunctionalInterface
+public interface Meaningless {
+ void m() throws Exception;
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java
index fb6ffca9..b2f1ed7e 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java
@@ -2,15 +2,23 @@ package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.oms.common.InstanceStatus;
+import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.oms.common.request.WorkerHeartbeat;
import com.github.kfcfans.oms.common.request.WorkerLogReportReq;
+import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.oms.common.response.AskResponse;
+import com.github.kfcfans.oms.common.utils.JsonUtils;
+import com.github.kfcfans.oms.common.utils.NetUtils;
import com.github.kfcfans.oms.server.common.utils.SpringUtils;
+import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
+import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.oms.server.service.log.InstanceLogService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.core.env.Environment;
/**
* 处理 Worker 请求
@@ -27,6 +35,7 @@ public class ServerActor extends AbstractActor {
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq)
+ .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
@@ -57,8 +66,39 @@ public class ServerActor extends AbstractActor {
}
}
+ /**
+ * 处理OMS在线日志请求
+ * @param req 日志请求
+ */
private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
+
+ /**
+ * 处理 Worker容器部署请求
+ * @param req 容器部署请求
+ */
+ private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
+
+ ContainerInfoRepository containerInfoRepository = SpringUtils.getBean(ContainerInfoRepository.class);
+ Environment environment = SpringUtils.getBean(Environment.class);
+ String port = environment.getProperty("local.server.port");
+
+ ContainerInfoDO containerInfo = containerInfoRepository.findByContainerName(req.getContainerName());
+ AskResponse askResponse = new AskResponse();
+ askResponse.setSuccess(false);
+ if (containerInfo != null) {
+ askResponse.setSuccess(true);
+
+ ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
+ BeanUtils.copyProperties(containerInfo, dpReq);
+ String downloadURL = String.format("http://%s:%s/container/downloadJar?md5=%s", NetUtils.getLocalHost(), port, containerInfo.getMd5());
+ dpReq.setDownloadURL(downloadURL);
+
+ askResponse.setData(JsonUtils.toBytes(dpReq));
+ }
+
+ getSender().tell(askResponse, getSelf());
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java
index 982ac4fd..08b0c085 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java
@@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.common.utils;
import com.github.kfcfans.oms.common.ContainerConstant;
import net.lingala.zip4j.ZipFile;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
@@ -72,7 +73,7 @@ public class ContainerTemplateGenerator {
// 2. 新建目录
String packagePath = StringUtils.replace(packageName, ".", "/");
String absPath = rootPath + "/src/main/java/" + packagePath;
- OmsFileUtils.forceMkdir(new File(absPath));
+ FileUtils.forceMkdir(new File(absPath));
// 3. 修改 Spring 配置文件
String resourcePath = rootPath + "/src/main/resources/";
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java
index 35c01ea7..a179f72e 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java
@@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.common.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.springframework.data.mongodb.gridfs.GridFsResource;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
@@ -44,38 +45,6 @@ public class OmsFileUtils {
return COMMON_PATH + "temporary/" + uuid + "/";
}
- /**
- * 为目标文件创建父文件夹
- * @param file 目标文件
- */
- public static void forceMkdir4Parent(File file) {
- File directory = file.getParentFile();
- forceMkdir(directory);
- }
-
- public static void forceMkdir(File directory) {
- if (directory.exists()) {
- if (!directory.isDirectory()) {
- final String message =
- "File "
- + directory
- + " exists and is "
- + "not a directory. Unable to create directory.";
- throw new RuntimeException(message);
- }
- } else {
- if (!directory.mkdirs()) {
- // Double-check that some other thread or process hasn't made
- // the directory in the background
- if (!directory.isDirectory()) {
- final String message =
- "Unable to create directory " + directory;
- throw new RuntimeException(message);
- }
- }
- }
- }
-
/**
* 将文本写入文件
* @param content 文本内容
@@ -89,6 +58,12 @@ public class OmsFileUtils {
}
}
+ /**
+ * 输出文件(对外下载功能)
+ * @param file 文件
+ * @param response HTTP响应
+ * @throws IOException 异常
+ */
public static void file2HttpResponse(File file, HttpServletResponse response) throws IOException {
response.setContentType("application/octet-stream");
@@ -103,4 +78,24 @@ public class OmsFileUtils {
}
}
}
+
+ /**
+ * 将 mongoDB 中的数据转存到本地文件中
+ * @param gridFsResource mongoDB 文件资源
+ * @param targetFile 本地文件资源
+ */
+ public static void gridFs2File(GridFsResource gridFsResource, File targetFile) {
+
+ byte[] buffer = new byte[1024];
+ try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
+ BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))
+ ) {
+ while (gis.read(buffer) != -1) {
+ bos.write(buffer);
+ }
+ bos.flush();
+ }catch (IOException ie) {
+ ExceptionUtils.rethrow(ie);
+ }
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java
index 3bba8221..3ebb48c3 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java
@@ -30,8 +30,8 @@ public class ContainerInfoDO {
// 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password
private String sourceInfo;
- // 文件名称(jar的MD5,唯一,作为 GridFS 的文件名)
- private String fileName;
+ // jar的MD5,唯一,作为 GridFS 的文件名
+ private String md5;
// 状态,枚举值为 ContainerStatus
private Integer status;
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java
index 85c22c5f..13b1fcf3 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java
@@ -15,4 +15,6 @@ public interface ContainerInfoRepository extends JpaRepository findByAppId(Long appId);
+ ContainerInfoDO findByContainerName(String containerName);
+
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java
new file mode 100644
index 00000000..ff1ff849
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java
@@ -0,0 +1,73 @@
+package com.github.kfcfans.oms.server.service;
+
+import com.github.kfcfans.oms.common.utils.CommonUtils;
+import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.gridfs.GridFsResource;
+import org.springframework.data.mongodb.gridfs.GridFsTemplate;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+
+/**
+ * 容器服务
+ *
+ * @author tjq
+ * @since 2020/5/16
+ */
+@Slf4j
+@Service
+public class ContainerService {
+
+ private GridFsTemplate gridFsTemplate;
+
+ /**
+ * 获取构建容器所需要的 Jar 文件
+ * @param md5 Jar文件的MD5值,可以由此构建 mongoDB 文件名
+ * @return 本地Jar文件
+ */
+ public File fetchContainerJarFile(String md5) {
+
+ String jarFileName = OmsFileUtils.genContainerJarPath() + genContainerJarName(md5);
+ File jarFile = new File(jarFileName);
+
+ if (jarFile.exists()) {
+ return jarFile;
+ }
+ if (gridFsTemplate != null) {
+ downloadJarFromMongoDB(genContainerJarName(md5), jarFile);
+ }
+ return jarFile;
+ }
+
+ private void downloadJarFromMongoDB(String mongoFileName, File targetFile) {
+ synchronized (mongoFileName.intern()) {
+ if (targetFile.exists()) {
+ return;
+ }
+ GridFsResource gridFsResource = gridFsTemplate.getResource(mongoFileName);
+ if (!gridFsResource.exists()) {
+ log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName);
+ return;
+ }
+ try {
+ OmsFileUtils.gridFs2File(gridFsResource, targetFile);
+ }catch (Exception e) {
+ CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile));
+ throw e;
+ }
+ }
+ }
+
+
+ private static String genContainerJarName(String md5) {
+ return String.format("oms-container-%s.jar", md5);
+ }
+
+ @Autowired(required = false)
+ public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
+ this.gridFsTemplate = gridFsTemplate;
+ }
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java
index 977ae9b0..ed38c68f 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java
@@ -15,6 +15,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;
@@ -102,6 +103,7 @@ public class InstanceLogService {
public StringPage fetchInstanceLog(Long instanceId, long index) {
try {
Future fileFuture = prepareLogFile(instanceId);
+ // 超时并不会打断正在执行的任务
File logFile = fileFuture.get(5, TimeUnit.SECONDS);
// 分页展示数据
@@ -215,15 +217,19 @@ public class InstanceLogService {
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) {
return f;
}
+ try {
+ // 创建父文件夹(文件在开流时自动会被创建)
+ FileUtils.forceMkdirParent(f);
- // 创建父文件夹(文件在开流时自动会被创建)
- OmsFileUtils.forceMkdir4Parent(f);
-
- // 重新构建文件
- try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
- stream2File(allLogStream, f);
+ // 重新构建文件
+ try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
+ stream2File(allLogStream, f);
+ }
+ return f;
+ }catch (Exception e) {
+ CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
+ throw new RuntimeException(e);
}
- return f;
});
}
}
@@ -232,36 +238,42 @@ public class InstanceLogService {
String path = genLogFilePath(instanceId, true);
synchronized (("stFileLock-" + instanceId).intern()) {
return localTransactionTemplate.execute(status -> {
+
File f = new File(path);
if (f.exists()) {
return f;
}
- // 创建父文件夹(文件在开流时自动会被创建)
- OmsFileUtils.forceMkdir4Parent(f);
+ try {
+ // 创建父文件夹(文件在开流时自动会被创建)
+ FileUtils.forceMkdirParent(f);
- // 本地存在数据,从本地持久化(对应 SYNC 的情况)
- if (instanceId2LastReportTime.containsKey(instanceId)) {
- try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
- stream2File(allLogStream, f);
+ // 本地存在数据,从本地持久化(对应 SYNC 的情况)
+ if (instanceId2LastReportTime.containsKey(instanceId)) {
+ try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
+ stream2File(allLogStream, f);
+ }
+ }else {
+
+ if (gridFsTemplate == null) {
+ OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f);
+ return f;
+ }
+
+ // 否则从 mongoDB 拉取数据(对应后期查询的情况)
+ GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId));
+
+ if (!gridFsResource.exists()) {
+ OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
+ return f;
+ }
+ OmsFileUtils.gridFs2File(gridFsResource, f);
}
- }else {
-
- if (gridFsTemplate == null) {
- OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f);
- return f;
- }
-
- // 否则从 mongoDB 拉取数据(对应后期查询的情况)
- GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId));
-
- if (!gridFsResource.exists()) {
- OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
- return f;
- }
- gridFs2File(gridFsResource, f);
+ return f;
+ }catch (Exception e) {
+ CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
+ throw new RuntimeException(e);
}
- return f;
});
}
}
@@ -284,24 +296,6 @@ public class InstanceLogService {
}
}
- /**
- * 将MongoDB中存储的日志持久化为磁盘日志
- * @param gridFsResource mongoDB 文件资源
- * @param logFile 本地文件资源
- */
- private void gridFs2File(GridFsResource gridFsResource, File logFile) {
- byte[] buffer = new byte[1024];
- try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
- BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
- ) {
- while (gis.read(buffer) != -1) {
- bos.write(buffer);
- }
- bos.flush();
- }catch (IOException ie) {
- ExceptionUtils.rethrow(ie);
- }
- }
/**
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java
index c6fc8a96..65f3c794 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java
@@ -6,11 +6,13 @@ import com.github.kfcfans.oms.server.common.utils.ContainerTemplateGenerator;
import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
+import com.github.kfcfans.oms.server.service.ContainerService;
import com.github.kfcfans.oms.server.web.request.GenerateContainerTemplateRequest;
import com.github.kfcfans.oms.server.web.request.SaveContainerInfoRequest;
import com.github.kfcfans.oms.server.web.response.ContainerInfoVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,9 +45,19 @@ public class ContainerController {
@Resource
private ContainerInfoRepository containerInfoRepository;
- @PostMapping("/downloadContainerTemplate")
- public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws Exception {
+ @Resource
+ private ContainerService containerService;
+ @GetMapping("/downloadJar")
+ public void downloadJar(String md5, HttpServletResponse response) throws IOException {
+ File file = containerService.fetchContainerJarFile(md5);
+ if (file.exists()) {
+ OmsFileUtils.file2HttpResponse(file, response);
+ }
+ }
+
+ @PostMapping("/downloadContainerTemplate")
+ public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws IOException {
File zipFile = ContainerTemplateGenerator.generate(req.getGroup(), req.getArtifact(), req.getName(), req.getPackageName(), req.getJavaVersion());
OmsFileUtils.file2HttpResponse(zipFile, response);
}
@@ -61,7 +73,7 @@ public class ContainerController {
String tmpFileName = UUID.randomUUID().toString() + ".jar";
tmpFileName = StringUtils.replace(tmpFileName, "-", "");
File jarFile = new File(path + tmpFileName);
- OmsFileUtils.forceMkdir4Parent(jarFile);
+ FileUtils.forceMkdirParent(jarFile);
file.transferTo(jarFile);
log.debug("[ContainerController] upload jarFile({}) to local disk success.", tmpFileName);
@@ -69,16 +81,16 @@ public class ContainerController {
// 2. 检查是否符合标准(是否为Jar,是否符合 template)
// 3. 生成MD5
- String realFileName;
+ String md5;
try(FileInputStream fis = new FileInputStream(jarFile)) {
- realFileName = DigestUtils.md5DigestAsHex(fis);
+ md5 = DigestUtils.md5DigestAsHex(fis);
}
// 3. 推送到 mongoDB
if (gridFsTemplate != null) {
}
- return ResultDTO.success(realFileName);
+ return ResultDTO.success(md5);
}
@PostMapping("/save")
@@ -101,7 +113,7 @@ public class ContainerController {
if (request.getSourceType() == ContainerSourceType.Git) {
}else {
- containerInfoDO.setFileName(request.getSourceInfo());
+ containerInfoDO.setMd5(request.getSourceInfo());
}
containerInfoRepository.saveAndFlush(containerInfoDO);
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java
index b1d9c6a7..85afbb98 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java
@@ -12,8 +12,8 @@ import java.util.Date;
*/
@Data
public class ContainerInfoVO {
- private Long id;
+ private Long id;
// 所属的应用ID
private Long appId;
@@ -24,8 +24,8 @@ public class ContainerInfoVO {
// 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password
private String sourceInfo;
- // 文件名称(jar的MD5,唯一,作为 GridFS 的文件名)
- private String fileName;
+ // jar的MD5,唯一,作为 GridFS 的文件名
+ private String md5;
// 状态,枚举值为 ContainerStatus
private Integer status;
diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml
index 8be069ea..14a2729d 100644
--- a/oh-my-scheduler-worker/pom.xml
+++ b/oh-my-scheduler-worker/pom.xml
@@ -20,17 +20,15 @@
3.4.2
5.6.1
5.0.0-RC5
- 2.6
-
+
org.springframework
spring-context
${spring.version}
- provided
@@ -68,12 +66,7 @@
test
-
-
- commons-io
- commons-io
- ${commons.io.version}
-
+
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java
index a7cfb21f..602013f5 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java
@@ -1,17 +1,15 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
-import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTrackerPool;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
-import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq;
import lombok.extern.slf4j.Slf4j;
/**
- * 普通计算节点,处理来自 JobTracker 的请求
+ * 普通计算节点,处理来自 TaskTracker 的请求
*
* @author tjq
* @since 2020/3/17
@@ -33,7 +31,6 @@ public class ProcessorTrackerActor extends AbstractActor {
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
- Long jobId = req.getInstanceInfo().getJobId();
Long instanceId = req.getInstanceInfo().getInstanceId();
// 创建 ProcessorTracker 一定能成功,且每个任务实例只会创建一个 ProcessorTracker
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java
index 101b5c44..e276e9b6 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java
@@ -1,10 +1,12 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
+import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
+import com.github.kfcfans.oms.worker.container.OmsContainerFactory;
import lombok.extern.slf4j.Slf4j;
/**
- * Worker节点Actor,主要用于和服务器保持心跳
+ * Worker节点Actor,接受服务器请求
*
* @author tjq
* @since 2020/3/24
@@ -15,7 +17,12 @@ public class WorkerActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
+ .match(ServerDeployContainerRequest.class, this::onReceiveServerDeployContainerRequest)
.matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
.build();
}
+
+ private void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
+ OmsContainerFactory.deployContainer(request);
+ }
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java
new file mode 100644
index 00000000..692df214
--- /dev/null
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java
@@ -0,0 +1,21 @@
+package com.github.kfcfans.oms.worker.common.utils;
+
+/**
+ * 文件工具类
+ *
+ * @author tjq
+ * @since 2020/5/16
+ */
+public class OmsWorkerFileUtils {
+
+ private static final String USER_HOME = System.getProperty("user.home", "oms");
+ private static final String WORKER_DIR = USER_HOME + "/oms/";
+
+ public static String getScriptDir() {
+ return WORKER_DIR + "script/";
+ }
+
+ public static String getContainerDir() {
+ return WORKER_DIR + "container/";
+ }
+}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java
index 6a33b94c..7aa9695b 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java
@@ -16,4 +16,12 @@ public interface OmsContainer extends LifeCycle {
* @return 处理器(可以是 MR、BD等处理器)
*/
BasicProcessor getProcessor(String className);
+
+ String getName();
+ String getMd5();
+
+ /**
+ * 尝试释放容器资源
+ */
+ void tryRelease();
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java
index d56ec435..b942ec3e 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java
@@ -1,8 +1,25 @@
package com.github.kfcfans.oms.worker.container;
+import akka.actor.ActorSelection;
+import akka.pattern.Patterns;
+import com.github.kfcfans.oms.common.RemoteConstant;
+import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
+import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest;
+import com.github.kfcfans.oms.common.response.AskResponse;
+import com.github.kfcfans.oms.worker.OhMyWorker;
+import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
+import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils;
import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.springframework.util.StringUtils;
+import java.io.File;
+import java.net.URL;
+import java.time.Duration;
import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
/**
* 容器工厂
@@ -10,8 +27,86 @@ import java.util.Map;
* @author tjq
* @since 2020/5/16
*/
+@Slf4j
public class OmsContainerFactory {
- private static final Map CARGO = Maps.newConcurrentMap();
+ private static final Map CARGO = Maps.newConcurrentMap();
+ /**
+ * 获取容器
+ * @param name 容器名称
+ * @return 容器示例,可能为 null
+ */
+ public static OmsContainer getContainer(String name) {
+
+ OmsContainer omsContainer = CARGO.get(name);
+ if (omsContainer != null) {
+ return omsContainer;
+ }
+
+ // 尝试下载
+ WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(name);
+
+ String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
+ if (StringUtils.isEmpty(serverPath)) {
+ return null;
+ }
+ ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
+ try {
+
+ CompletionStage