From fe2b9c78acf9c434cf44b25a5a900a67616439a6 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 16 May 2020 17:33:53 +0800 Subject: [PATCH] [dev] Web development really makes me a headache --- oh-my-scheduler-common/pom.xml | 8 ++ .../kfcfans/oms/common/ProcessorType.java | 3 +- .../request/ServerDeployContainerRequest.java | 31 ++++++ .../WorkerNeedDeployContainerRequest.java | 19 ++++ .../kfcfans/oms/common/utils/CommonUtils.java | 8 ++ .../kfcfans/oms/common/utils/Meaningless.java | 12 +++ .../oms/server/akka/actors/ServerActor.java | 40 ++++++++ .../utils/ContainerTemplateGenerator.java | 3 +- .../oms/server/common/utils/OmsFileUtils.java | 59 ++++++----- .../core/model/ContainerInfoDO.java | 4 +- .../repository/ContainerInfoRepository.java | 2 + .../oms/server/service/ContainerService.java | 73 ++++++++++++++ .../service/log/InstanceLogService.java | 88 ++++++++--------- .../web/controller/ContainerController.java | 26 +++-- .../server/web/response/ContainerInfoVO.java | 6 +- oh-my-scheduler-worker/pom.xml | 11 +-- .../worker/actors/ProcessorTrackerActor.java | 5 +- .../oms/worker/actors/WorkerActor.java | 9 +- .../common/utils/OmsWorkerFileUtils.java | 21 ++++ .../oms/worker/container/OmsContainer.java | 8 ++ .../worker/container/OmsContainerFactory.java | 97 ++++++++++++++++++- .../oms/worker/container/OmsJarContainer.java | 96 +++++++++++++----- .../core/processor/built/ScriptProcessor.java | 4 +- .../tracker/processor/ProcessorTracker.java | 17 +++- 24 files changed, 513 insertions(+), 137 deletions(-) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml index d9d6acf4..3ceae542 100644 --- a/oh-my-scheduler-common/pom.xml +++ b/oh-my-scheduler-common/pom.xml @@ -16,6 +16,7 @@ 1.7.30 3.10 + 2.6 29.0-jre 4.4.1 2.6.4 @@ -61,6 +62,13 @@ akka-serialization-jackson_2.13 ${akka.version} + + + + commons-io + commons-io + ${commons.io.version} + \ No newline at end of file diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java index a2db9ab5..98e3ba5c 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/ProcessorType.java @@ -15,7 +15,8 @@ public enum ProcessorType { EMBEDDED_JAVA(1, "内置JAVA处理器"), SHELL(2, "SHELL脚本"), - PYTHON(3, "Python脚本"); + PYTHON(3, "Python脚本"), + JAVA_CONTAINER(4, "Java容器"); private int v; private String des; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java new file mode 100644 index 00000000..ace2fe8f --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/ServerDeployContainerRequest.java @@ -0,0 +1,31 @@ +package com.github.kfcfans.oms.common.request; + +import com.github.kfcfans.oms.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Worker部署Container请求 + * + * @author tjq + * @since 2020/5/16 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ServerDeployContainerRequest implements OmsSerializable { + + /** + * 容器名称 + */ + private String containerName; + /** + * 文件名(MD5值),用于做版本校验和文件下载 + */ + private String md5; + /** + * 下载地址 + */ + private String downloadURL; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java new file mode 100644 index 00000000..41d4a5d0 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java @@ -0,0 +1,19 @@ +package com.github.kfcfans.oms.common.request.http; + +import com.github.kfcfans.oms.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Worker需要部署容器,主动向Server请求信息 + * + * @author tjq + * @since 2020/5/16 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkerNeedDeployContainerRequest implements OmsSerializable { + private String containerName; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java index c4d4dcbc..8e155065 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.common.utils; import lombok.extern.slf4j.Slf4j; import java.util.Collection; +import java.util.function.Consumer; import java.util.function.Supplier; @@ -88,4 +89,11 @@ public class CommonUtils { }catch (Exception ignore) { } } + + public static void executeIgnoreException(Meaningless executor) { + try { + executor.m(); + }catch (Exception ignore) { + } + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java new file mode 100644 index 00000000..ebe62ed5 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/Meaningless.java @@ -0,0 +1,12 @@ +package com.github.kfcfans.oms.common.utils; + +/** + * 毫无意义就是最大的意义 + * + * @author tjq + * @since 2020/5/16 + */ +@FunctionalInterface +public interface Meaningless { + void m() throws Exception; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java index fb6ffca9..b2f1ed7e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java @@ -2,15 +2,23 @@ package com.github.kfcfans.oms.server.akka.actors; import akka.actor.AbstractActor; import com.github.kfcfans.oms.common.InstanceStatus; +import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; import com.github.kfcfans.oms.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.oms.common.request.WorkerHeartbeat; import com.github.kfcfans.oms.common.request.WorkerLogReportReq; +import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest; import com.github.kfcfans.oms.common.response.AskResponse; +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.common.utils.NetUtils; import com.github.kfcfans.oms.server.common.utils.SpringUtils; +import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository; import com.github.kfcfans.oms.server.service.log.InstanceLogService; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.core.env.Environment; /** * 处理 Worker 请求 @@ -27,6 +35,7 @@ public class ServerActor extends AbstractActor { .match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat) .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) .match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq) + .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .build(); } @@ -57,8 +66,39 @@ public class ServerActor extends AbstractActor { } } + /** + * 处理OMS在线日志请求 + * @param req 日志请求 + */ private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) { // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); } + + /** + * 处理 Worker容器部署请求 + * @param req 容器部署请求 + */ + private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { + + ContainerInfoRepository containerInfoRepository = SpringUtils.getBean(ContainerInfoRepository.class); + Environment environment = SpringUtils.getBean(Environment.class); + String port = environment.getProperty("local.server.port"); + + ContainerInfoDO containerInfo = containerInfoRepository.findByContainerName(req.getContainerName()); + AskResponse askResponse = new AskResponse(); + askResponse.setSuccess(false); + if (containerInfo != null) { + askResponse.setSuccess(true); + + ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest(); + BeanUtils.copyProperties(containerInfo, dpReq); + String downloadURL = String.format("http://%s:%s/container/downloadJar?md5=%s", NetUtils.getLocalHost(), port, containerInfo.getMd5()); + dpReq.setDownloadURL(downloadURL); + + askResponse.setData(JsonUtils.toBytes(dpReq)); + } + + getSender().tell(askResponse, getSelf()); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java index 982ac4fd..08b0c085 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/ContainerTemplateGenerator.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.common.utils; import com.github.kfcfans.oms.common.ContainerConstant; import net.lingala.zip4j.ZipFile; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import java.io.BufferedReader; @@ -72,7 +73,7 @@ public class ContainerTemplateGenerator { // 2. 新建目录 String packagePath = StringUtils.replace(packageName, ".", "/"); String absPath = rootPath + "/src/main/java/" + packagePath; - OmsFileUtils.forceMkdir(new File(absPath)); + FileUtils.forceMkdir(new File(absPath)); // 3. 修改 Spring 配置文件 String resourcePath = rootPath + "/src/main/resources/"; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java index 35c01ea7..a179f72e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/OmsFileUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.common.utils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.data.mongodb.gridfs.GridFsResource; import javax.servlet.http.HttpServletResponse; import java.io.*; @@ -44,38 +45,6 @@ public class OmsFileUtils { return COMMON_PATH + "temporary/" + uuid + "/"; } - /** - * 为目标文件创建父文件夹 - * @param file 目标文件 - */ - public static void forceMkdir4Parent(File file) { - File directory = file.getParentFile(); - forceMkdir(directory); - } - - public static void forceMkdir(File directory) { - if (directory.exists()) { - if (!directory.isDirectory()) { - final String message = - "File " - + directory - + " exists and is " - + "not a directory. Unable to create directory."; - throw new RuntimeException(message); - } - } else { - if (!directory.mkdirs()) { - // Double-check that some other thread or process hasn't made - // the directory in the background - if (!directory.isDirectory()) { - final String message = - "Unable to create directory " + directory; - throw new RuntimeException(message); - } - } - } - } - /** * 将文本写入文件 * @param content 文本内容 @@ -89,6 +58,12 @@ public class OmsFileUtils { } } + /** + * 输出文件(对外下载功能) + * @param file 文件 + * @param response HTTP响应 + * @throws IOException 异常 + */ public static void file2HttpResponse(File file, HttpServletResponse response) throws IOException { response.setContentType("application/octet-stream"); @@ -103,4 +78,24 @@ public class OmsFileUtils { } } } + + /** + * 将 mongoDB 中的数据转存到本地文件中 + * @param gridFsResource mongoDB 文件资源 + * @param targetFile 本地文件资源 + */ + public static void gridFs2File(GridFsResource gridFsResource, File targetFile) { + + byte[] buffer = new byte[1024]; + try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile)) + ) { + while (gis.read(buffer) != -1) { + bos.write(buffer); + } + bos.flush(); + }catch (IOException ie) { + ExceptionUtils.rethrow(ie); + } + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java index 3bba8221..3ebb48c3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/ContainerInfoDO.java @@ -30,8 +30,8 @@ public class ContainerInfoDO { // 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password private String sourceInfo; - // 文件名称(jar的MD5,唯一,作为 GridFS 的文件名) - private String fileName; + // jar的MD5,唯一,作为 GridFS 的文件名 + private String md5; // 状态,枚举值为 ContainerStatus private Integer status; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java index 85c22c5f..13b1fcf3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/ContainerInfoRepository.java @@ -15,4 +15,6 @@ public interface ContainerInfoRepository extends JpaRepository findByAppId(Long appId); + ContainerInfoDO findByContainerName(String containerName); + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java new file mode 100644 index 00000000..ff1ff849 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java @@ -0,0 +1,73 @@ +package com.github.kfcfans.oms.server.service; + +import com.github.kfcfans.oms.common.utils.CommonUtils; +import com.github.kfcfans.oms.server.common.utils.OmsFileUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.gridfs.GridFsResource; +import org.springframework.data.mongodb.gridfs.GridFsTemplate; +import org.springframework.stereotype.Service; + +import java.io.File; + +/** + * 容器服务 + * + * @author tjq + * @since 2020/5/16 + */ +@Slf4j +@Service +public class ContainerService { + + private GridFsTemplate gridFsTemplate; + + /** + * 获取构建容器所需要的 Jar 文件 + * @param md5 Jar文件的MD5值,可以由此构建 mongoDB 文件名 + * @return 本地Jar文件 + */ + public File fetchContainerJarFile(String md5) { + + String jarFileName = OmsFileUtils.genContainerJarPath() + genContainerJarName(md5); + File jarFile = new File(jarFileName); + + if (jarFile.exists()) { + return jarFile; + } + if (gridFsTemplate != null) { + downloadJarFromMongoDB(genContainerJarName(md5), jarFile); + } + return jarFile; + } + + private void downloadJarFromMongoDB(String mongoFileName, File targetFile) { + synchronized (mongoFileName.intern()) { + if (targetFile.exists()) { + return; + } + GridFsResource gridFsResource = gridFsTemplate.getResource(mongoFileName); + if (!gridFsResource.exists()) { + log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName); + return; + } + try { + OmsFileUtils.gridFs2File(gridFsResource, targetFile); + }catch (Exception e) { + CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile)); + throw e; + } + } + } + + + private static String genContainerJarName(String md5) { + return String.format("oms-container-%s.jar", md5); + } + + @Autowired(required = false) + public void setGridFsTemplate(GridFsTemplate gridFsTemplate) { + this.gridFsTemplate = gridFsTemplate; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java index 977ae9b0..ed38c68f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/log/InstanceLogService.java @@ -15,6 +15,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; @@ -102,6 +103,7 @@ public class InstanceLogService { public StringPage fetchInstanceLog(Long instanceId, long index) { try { Future fileFuture = prepareLogFile(instanceId); + // 超时并不会打断正在执行的任务 File logFile = fileFuture.get(5, TimeUnit.SECONDS); // 分页展示数据 @@ -215,15 +217,19 @@ public class InstanceLogService { if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) { return f; } + try { + // 创建父文件夹(文件在开流时自动会被创建) + FileUtils.forceMkdirParent(f); - // 创建父文件夹(文件在开流时自动会被创建) - OmsFileUtils.forceMkdir4Parent(f); - - // 重新构建文件 - try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { - stream2File(allLogStream, f); + // 重新构建文件 + try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { + stream2File(allLogStream, f); + } + return f; + }catch (Exception e) { + CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f)); + throw new RuntimeException(e); } - return f; }); } } @@ -232,36 +238,42 @@ public class InstanceLogService { String path = genLogFilePath(instanceId, true); synchronized (("stFileLock-" + instanceId).intern()) { return localTransactionTemplate.execute(status -> { + File f = new File(path); if (f.exists()) { return f; } - // 创建父文件夹(文件在开流时自动会被创建) - OmsFileUtils.forceMkdir4Parent(f); + try { + // 创建父文件夹(文件在开流时自动会被创建) + FileUtils.forceMkdirParent(f); - // 本地存在数据,从本地持久化(对应 SYNC 的情况) - if (instanceId2LastReportTime.containsKey(instanceId)) { - try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { - stream2File(allLogStream, f); + // 本地存在数据,从本地持久化(对应 SYNC 的情况) + if (instanceId2LastReportTime.containsKey(instanceId)) { + try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { + stream2File(allLogStream, f); + } + }else { + + if (gridFsTemplate == null) { + OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f); + return f; + } + + // 否则从 mongoDB 拉取数据(对应后期查询的情况) + GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId)); + + if (!gridFsResource.exists()) { + OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f); + return f; + } + OmsFileUtils.gridFs2File(gridFsResource, f); } - }else { - - if (gridFsTemplate == null) { - OmsFileUtils.string2File("SYSTEM: There is no local log for this task now, you need to use mongoDB to store the past logs.", f); - return f; - } - - // 否则从 mongoDB 拉取数据(对应后期查询的情况) - GridFsResource gridFsResource = gridFsTemplate.getResource(genMongoFileName(instanceId)); - - if (!gridFsResource.exists()) { - OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f); - return f; - } - gridFs2File(gridFsResource, f); + return f; + }catch (Exception e) { + CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f)); + throw new RuntimeException(e); } - return f; }); } } @@ -284,24 +296,6 @@ public class InstanceLogService { } } - /** - * 将MongoDB中存储的日志持久化为磁盘日志 - * @param gridFsResource mongoDB 文件资源 - * @param logFile 本地文件资源 - */ - private void gridFs2File(GridFsResource gridFsResource, File logFile) { - byte[] buffer = new byte[1024]; - try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream()); - BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile)) - ) { - while (gis.read(buffer) != -1) { - bos.write(buffer); - } - bos.flush(); - }catch (IOException ie) { - ExceptionUtils.rethrow(ie); - } - } /** diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java index c6fc8a96..65f3c794 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java @@ -6,11 +6,13 @@ import com.github.kfcfans.oms.server.common.utils.ContainerTemplateGenerator; import com.github.kfcfans.oms.server.common.utils.OmsFileUtils; import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository; +import com.github.kfcfans.oms.server.service.ContainerService; import com.github.kfcfans.oms.server.web.request.GenerateContainerTemplateRequest; import com.github.kfcfans.oms.server.web.request.SaveContainerInfoRequest; import com.github.kfcfans.oms.server.web.response.ContainerInfoVO; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -43,9 +45,19 @@ public class ContainerController { @Resource private ContainerInfoRepository containerInfoRepository; - @PostMapping("/downloadContainerTemplate") - public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws Exception { + @Resource + private ContainerService containerService; + @GetMapping("/downloadJar") + public void downloadJar(String md5, HttpServletResponse response) throws IOException { + File file = containerService.fetchContainerJarFile(md5); + if (file.exists()) { + OmsFileUtils.file2HttpResponse(file, response); + } + } + + @PostMapping("/downloadContainerTemplate") + public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws IOException { File zipFile = ContainerTemplateGenerator.generate(req.getGroup(), req.getArtifact(), req.getName(), req.getPackageName(), req.getJavaVersion()); OmsFileUtils.file2HttpResponse(zipFile, response); } @@ -61,7 +73,7 @@ public class ContainerController { String tmpFileName = UUID.randomUUID().toString() + ".jar"; tmpFileName = StringUtils.replace(tmpFileName, "-", ""); File jarFile = new File(path + tmpFileName); - OmsFileUtils.forceMkdir4Parent(jarFile); + FileUtils.forceMkdirParent(jarFile); file.transferTo(jarFile); log.debug("[ContainerController] upload jarFile({}) to local disk success.", tmpFileName); @@ -69,16 +81,16 @@ public class ContainerController { // 2. 检查是否符合标准(是否为Jar,是否符合 template) // 3. 生成MD5 - String realFileName; + String md5; try(FileInputStream fis = new FileInputStream(jarFile)) { - realFileName = DigestUtils.md5DigestAsHex(fis); + md5 = DigestUtils.md5DigestAsHex(fis); } // 3. 推送到 mongoDB if (gridFsTemplate != null) { } - return ResultDTO.success(realFileName); + return ResultDTO.success(md5); } @PostMapping("/save") @@ -101,7 +113,7 @@ public class ContainerController { if (request.getSourceType() == ContainerSourceType.Git) { }else { - containerInfoDO.setFileName(request.getSourceInfo()); + containerInfoDO.setMd5(request.getSourceInfo()); } containerInfoRepository.saveAndFlush(containerInfoDO); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java index b1d9c6a7..85afbb98 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/ContainerInfoVO.java @@ -12,8 +12,8 @@ import java.util.Date; */ @Data public class ContainerInfoVO { - private Long id; + private Long id; // 所属的应用ID private Long appId; @@ -24,8 +24,8 @@ public class ContainerInfoVO { // 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password private String sourceInfo; - // 文件名称(jar的MD5,唯一,作为 GridFS 的文件名) - private String fileName; + // jar的MD5,唯一,作为 GridFS 的文件名 + private String md5; // 状态,枚举值为 ContainerStatus private Integer status; diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 8be069ea..14a2729d 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -20,17 +20,15 @@ 3.4.2 5.6.1 5.0.0-RC5 - 2.6 - + org.springframework spring-context ${spring.version} - provided @@ -68,12 +66,7 @@ test - - - commons-io - commons-io - ${commons.io.version} - + diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java index a7cfb21f..602013f5 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java @@ -1,17 +1,15 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; -import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTrackerPool; import com.github.kfcfans.oms.worker.persistence.TaskDO; -import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq; import lombok.extern.slf4j.Slf4j; /** - * 普通计算节点,处理来自 JobTracker 的请求 + * 普通计算节点,处理来自 TaskTracker 的请求 * * @author tjq * @since 2020/3/17 @@ -33,7 +31,6 @@ public class ProcessorTrackerActor extends AbstractActor { */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { - Long jobId = req.getInstanceInfo().getJobId(); Long instanceId = req.getInstanceInfo().getInstanceId(); // 创建 ProcessorTracker 一定能成功,且每个任务实例只会创建一个 ProcessorTracker diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java index 101b5c44..e276e9b6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java @@ -1,10 +1,12 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; +import com.github.kfcfans.oms.worker.container.OmsContainerFactory; import lombok.extern.slf4j.Slf4j; /** - * Worker节点Actor,主要用于和服务器保持心跳 + * Worker节点Actor,接受服务器请求 * * @author tjq * @since 2020/3/24 @@ -15,7 +17,12 @@ public class WorkerActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() + .match(ServerDeployContainerRequest.class, this::onReceiveServerDeployContainerRequest) .matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj)) .build(); } + + private void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) { + OmsContainerFactory.deployContainer(request); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java new file mode 100644 index 00000000..692df214 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/OmsWorkerFileUtils.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.oms.worker.common.utils; + +/** + * 文件工具类 + * + * @author tjq + * @since 2020/5/16 + */ +public class OmsWorkerFileUtils { + + private static final String USER_HOME = System.getProperty("user.home", "oms"); + private static final String WORKER_DIR = USER_HOME + "/oms/"; + + public static String getScriptDir() { + return WORKER_DIR + "script/"; + } + + public static String getContainerDir() { + return WORKER_DIR + "container/"; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java index 6a33b94c..7aa9695b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java @@ -16,4 +16,12 @@ public interface OmsContainer extends LifeCycle { * @return 处理器(可以是 MR、BD等处理器) */ BasicProcessor getProcessor(String className); + + String getName(); + String getMd5(); + + /** + * 尝试释放容器资源 + */ + void tryRelease(); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java index d56ec435..b942ec3e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java @@ -1,8 +1,25 @@ package com.github.kfcfans.oms.worker.container; +import akka.actor.ActorSelection; +import akka.pattern.Patterns; +import com.github.kfcfans.oms.common.RemoteConstant; +import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; +import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest; +import com.github.kfcfans.oms.common.response.AskResponse; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils; import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.springframework.util.StringUtils; +import java.io.File; +import java.net.URL; +import java.time.Duration; import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; /** * 容器工厂 @@ -10,8 +27,86 @@ import java.util.Map; * @author tjq * @since 2020/5/16 */ +@Slf4j public class OmsContainerFactory { - private static final Map CARGO = Maps.newConcurrentMap(); + private static final Map CARGO = Maps.newConcurrentMap(); + /** + * 获取容器 + * @param name 容器名称 + * @return 容器示例,可能为 null + */ + public static OmsContainer getContainer(String name) { + + OmsContainer omsContainer = CARGO.get(name); + if (omsContainer != null) { + return omsContainer; + } + + // 尝试下载 + WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(name); + + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + if (StringUtils.isEmpty(serverPath)) { + return null; + } + ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + try { + + CompletionStage askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + if (askResponse.isSuccess()) { + ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class); + deployContainer(deployRequest); + } + }catch (Exception e) { + log.error("[OmsContainer] get container(name={}) failed.", name, e); + } + + return CARGO.get(name); + } + + + /** + * 部署容器,整个过程串行进行,问题不大 + * @param request 部署容器请求 + */ + public static synchronized void deployContainer(ServerDeployContainerRequest request) { + + String containerName = request.getContainerName(); + String md5 = request.getMd5(); + + OmsContainer oldContainer = CARGO.get(containerName); + if (oldContainer != null && md5.equals(oldContainer.getMd5())) { + log.info("[OmsContainerFactory] container(name={},md5={}) already deployed.", containerName, md5); + return; + } + + try { + + // 下载Container到本地 + String filePath = OmsWorkerFileUtils.getContainerDir() + containerName + "/" + md5 + ".jar"; + File jarFile = new File(filePath); + FileUtils.forceMkdirParent(jarFile); + FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000); + + // 创建新容器 + OmsContainer newContainer = new OmsJarContainer(containerName, md5, jarFile); + newContainer.init(); + + // 替换容器 + CARGO.put(containerName, newContainer); + log.info("[OmsContainerFactory] container(name={},md5={}) deployed successfully.", containerName, md5); + + if (oldContainer != null) { + // 销毁旧容器 + oldContainer.destroy(); + } + + }catch (Exception e) { + log.error("[OmsContainerFactory] deploy container(name={},md5={}) failed.", containerName, md5, e); + } + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java index 0fdca3f8..3fb16ea9 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java @@ -1,11 +1,11 @@ package com.github.kfcfans.oms.worker.container; import com.github.kfcfans.oms.common.ContainerConstant; +import com.github.kfcfans.oms.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.common.OmsWorkerException; import com.github.kfcfans.oms.worker.core.classloader.OhMyClassLoader; import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -16,6 +16,7 @@ import java.io.InputStream; import java.net.URL; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; /** * OMS 容器实现 @@ -26,30 +27,32 @@ import java.util.Properties; @Slf4j public class OmsJarContainer implements OmsContainer { - @Getter - private final Long id; private final String name; - private final String localJarPath; + private final String md5; + private final File localJarFile; + + // 引用计数器 + private final AtomicInteger referenceCount = new AtomicInteger(0); private OhMyClassLoader containerClassLoader; private ClassPathXmlApplicationContext container; private Map processorCache = Maps.newConcurrentMap(); - public OmsJarContainer(Long containerId, String containerName, String localJarPath) { - this.id = containerId; - this.name = containerName; - this.localJarPath = localJarPath; + public OmsJarContainer(String name, String md5, File localJarFile) { + this.name = name; + this.md5 = md5; + this.localJarFile = localJarFile; } @Override public BasicProcessor getProcessor(String className) { - return processorCache.computeIfAbsent(className, ignore -> { + BasicProcessor basicProcessor = processorCache.computeIfAbsent(className, ignore -> { Class targetClass; try { targetClass = containerClassLoader.loadClass(className); - }catch (ClassNotFoundException cnf) { + } catch (ClassNotFoundException cnf) { log.error("[OmsJarContainer-{}] can't find class: {} in container.", name, className); return null; } @@ -57,11 +60,14 @@ public class OmsJarContainer implements OmsContainer { // 先尝试从 Spring IOC 容器加载 try { return (BasicProcessor) container.getBean(targetClass); - }catch (BeansException be) { + } catch (BeansException be) { log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", name); - }catch (ClassCastException cce) { + } catch (ClassCastException cce) { log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", name, className); return null; + } catch (Exception e) { + log.error("[OmsJarContainer-{}] get bean failed for {}.", name, className, e); + return null; } // 直接实例化 @@ -70,20 +76,25 @@ public class OmsJarContainer implements OmsContainer { BasicProcessor processor = (BasicProcessor) obj; processor.init(); return processor; - }catch (Exception e) { + } catch (Exception e) { log.error("[OmsJarContainer-{}] load {} failed", name, className, e); } return null; }); + + if (basicProcessor != null) { + // 引用计数 + 1 + referenceCount.getAndIncrement(); + } + return basicProcessor; } @Override public void init() throws Exception { - log.info("[OmsJarContainer] start to init container(id={},name={},jarPath={})", id, name, localJarPath); + log.info("[OmsJarContainer] start to init container(name={},jarPath={})", name, localJarFile.getPath()); - File file = new File(localJarPath); - URL jarURL = file.toURI().toURL(); + URL jarURL = localJarFile.toURI().toURL(); // 创建类加载器 this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader()); @@ -93,11 +104,11 @@ public class OmsJarContainer implements OmsContainer { URL springXmlURL = containerClassLoader.getResource(ContainerConstant.SPRING_CONTEXT_FILE_NAME); if (propertiesURL == null) { - log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarPath); + log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); throw new OmsWorkerException("invalid jar file"); } if (springXmlURL == null) { - log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarPath); + log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarFile.getPath()); throw new OmsWorkerException("invalid jar file"); } @@ -121,18 +132,51 @@ public class OmsJarContainer implements OmsContainer { this.container.setClassLoader(containerClassLoader); this.container.refresh(); - log.info("[OmsJarContainer] init container(id={},name={},jarPath={}) successfully", id, name, localJarPath); + log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", name, localJarFile.getPath()); } @Override public void destroy() throws Exception { - try { - processorCache.clear(); - container.close(); - containerClassLoader.close(); - log.info("[OmsJarContainer-{}] container destroyed successfully", name); - }catch (Exception e) { - log.error("[OmsJarContainer-{}] container destroyed failed", name, e); + + // 没有其余引用时,才允许执行 destroy + if (referenceCount.get() <= 0) { + try { + processorCache.clear(); + container.close(); + containerClassLoader.close(); + log.info("[OmsJarContainer-{}] container destroyed successfully", name); + }catch (Exception e) { + log.error("[OmsJarContainer-{}] container destroyed failed", name, e); + } + return; + } + + log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", name, referenceCount.get()); + } + + @Override + public String getName() { + return name; + } + @Override + public String getMd5() { + return md5; + } + + @Override + public void tryRelease() { + + log.debug("[OmsJarContainer-{}] tryRelease, current reference is {}.", name, referenceCount.get()); + // 需要满足的条件:引用计数器减为0 & 有更新的容器出现 + if (referenceCount.decrementAndGet() <= 0) { + + OmsContainer container = OmsContainerFactory.getContainer(name); + if (container != this) { + try { + destroy(); + }catch (Exception ignore) { + } + } } } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java index f054090d..f3e3025b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker.core.processor.built; +import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils; import com.github.kfcfans.oms.worker.core.processor.ProcessResult; import com.github.kfcfans.oms.worker.core.processor.TaskContext; import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; @@ -30,13 +31,12 @@ public abstract class ScriptProcessor implements BasicProcessor { private final long timeout; private final ExecutorService threadPool; - private static final String USER_HOME = System.getProperty("user.home", "oms"); private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); public ScriptProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception { this.instanceId = instanceId; - this.scriptPath = USER_HOME + "/oms/script/" + genScriptName(instanceId); + this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(instanceId); this.timeout = timeout; this.threadPool = pool; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 7bc23fef..cd9b6253 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -10,6 +10,8 @@ import com.github.kfcfans.oms.common.RemoteConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; +import com.github.kfcfans.oms.worker.container.OmsContainer; +import com.github.kfcfans.oms.worker.container.OmsContainerFactory; import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor; @@ -47,6 +49,8 @@ public class ProcessorTracker { // 任务执行器 private BasicProcessor processor; + // 容器(可能为空) + private OmsContainer omsContainer; // 在线日志 private OmsLogger omsLogger; @@ -148,13 +152,17 @@ public class ProcessorTracker { */ public void destroy() { + // 0. 移除Container引用 + if (omsContainer != null) { + omsContainer.tryRelease(); + } + // 1. 关闭执行执行线程池 CommonUtils.executeIgnoreException(() -> { List tasks = threadPool.shutdownNow(); if (!CollectionUtils.isEmpty(tasks)) { log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", instanceId, tasks.size()); } - return null; }); // 2. 去除顶层引用,送入GC世界 @@ -255,6 +263,13 @@ public class ProcessorTracker { case PYTHON: processor = new PythonProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS(), threadPool); break; + case JAVA_CONTAINER: + String[] split = processorInfo.split("#"); + omsContainer = OmsContainerFactory.getContainer(split[0]); + if (omsContainer != null) { + processor = omsContainer.getProcessor(split[1]); + } + break; default: log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType); throw new IllegalArgumentException("unknown processor type of " + processorType);