[dev] Web development really makes me a headache

This commit is contained in:
tjq 2020-05-16 17:33:53 +08:00
parent 456a1a8f25
commit fe2b9c78ac
24 changed files with 513 additions and 137 deletions

View File

@ -16,6 +16,7 @@
<properties>
<slf4j.version>1.7.30</slf4j.version>
<commons.lang.version>3.10</commons.lang.version>
<commons.io.version>2.6</commons.io.version>
<guava.version>29.0-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version>
<akka.version>2.6.4</akka.version>
@ -61,6 +62,13 @@
<artifactId>akka-serialization-jackson_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -15,7 +15,8 @@ public enum ProcessorType {
EMBEDDED_JAVA(1, "内置JAVA处理器"),
SHELL(2, "SHELL脚本"),
PYTHON(3, "Python脚本");
PYTHON(3, "Python脚本"),
JAVA_CONTAINER(4, "Java容器");
private int v;
private String des;

View File

@ -0,0 +1,31 @@
package com.github.kfcfans.oms.common.request;
import com.github.kfcfans.oms.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Worker部署Container请求
*
* @author tjq
* @since 2020/5/16
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerDeployContainerRequest implements OmsSerializable {
/**
* 容器名称
*/
private String containerName;
/**
* 文件名MD5值用于做版本校验和文件下载
*/
private String md5;
/**
* 下载地址
*/
private String downloadURL;
}

View File

@ -0,0 +1,19 @@
package com.github.kfcfans.oms.common.request.http;
import com.github.kfcfans.oms.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Worker需要部署容器主动向Server请求信息
*
* @author tjq
* @since 2020/5/16
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkerNeedDeployContainerRequest implements OmsSerializable {
private String containerName;
}

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -88,4 +89,11 @@ public class CommonUtils {
}catch (Exception ignore) {
}
}
public static void executeIgnoreException(Meaningless executor) {
try {
executor.m();
}catch (Exception ignore) {
}
}
}

View File

@ -0,0 +1,12 @@
package com.github.kfcfans.oms.common.utils;
/**
* 毫无意义就是最大的意义
*
* @author tjq
* @since 2020/5/16
*/
@FunctionalInterface
public interface Meaningless {
void m() throws Exception;
}

View File

@ -2,15 +2,23 @@ package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.oms.common.request.WorkerHeartbeat;
import com.github.kfcfans.oms.common.request.WorkerLogReportReq;
import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.oms.common.response.AskResponse;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.NetUtils;
import com.github.kfcfans.oms.server.common.utils.SpringUtils;
import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.oms.server.service.log.InstanceLogService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
/**
* 处理 Worker 请求
@ -27,6 +35,7 @@ public class ServerActor extends AbstractActor {
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq)
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
@ -57,8 +66,39 @@ public class ServerActor extends AbstractActor {
}
}
/**
* 处理OMS在线日志请求
* @param req 日志请求
*/
private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get ...
SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
/**
* 处理 Worker容器部署请求
* @param req 容器部署请求
*/
private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
ContainerInfoRepository containerInfoRepository = SpringUtils.getBean(ContainerInfoRepository.class);
Environment environment = SpringUtils.getBean(Environment.class);
String port = environment.getProperty("local.server.port");
ContainerInfoDO containerInfo = containerInfoRepository.findByContainerName(req.getContainerName());
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(false);
if (containerInfo != null) {
askResponse.setSuccess(true);
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
BeanUtils.copyProperties(containerInfo, dpReq);
String downloadURL = String.format("http://%s:%s/container/downloadJar?md5=%s", NetUtils.getLocalHost(), port, containerInfo.getMd5());
dpReq.setDownloadURL(downloadURL);
askResponse.setData(JsonUtils.toBytes(dpReq));
}
getSender().tell(askResponse, getSelf());
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.common.utils;
import com.github.kfcfans.oms.common.ContainerConstant;
import net.lingala.zip4j.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
@ -72,7 +73,7 @@ public class ContainerTemplateGenerator {
// 2. 新建目录
String packagePath = StringUtils.replace(packageName, ".", "/");
String absPath = rootPath + "/src/main/java/" + packagePath;
OmsFileUtils.forceMkdir(new File(absPath));
FileUtils.forceMkdir(new File(absPath));
// 3. 修改 Spring 配置文件
String resourcePath = rootPath + "/src/main/resources/";

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.server.common.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.data.mongodb.gridfs.GridFsResource;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
@ -44,38 +45,6 @@ public class OmsFileUtils {
return COMMON_PATH + "temporary/" + uuid + "/";
}
/**
* 为目标文件创建父文件夹
* @param file 目标文件
*/
public static void forceMkdir4Parent(File file) {
File directory = file.getParentFile();
forceMkdir(directory);
}
public static void forceMkdir(File directory) {
if (directory.exists()) {
if (!directory.isDirectory()) {
final String message =
"File "
+ directory
+ " exists and is "
+ "not a directory. Unable to create directory.";
throw new RuntimeException(message);
}
} else {
if (!directory.mkdirs()) {
// Double-check that some other thread or process hasn't made
// the directory in the background
if (!directory.isDirectory()) {
final String message =
"Unable to create directory " + directory;
throw new RuntimeException(message);
}
}
}
}
/**
* 将文本写入文件
* @param content 文本内容
@ -89,6 +58,12 @@ public class OmsFileUtils {
}
}
/**
* 输出文件对外下载功能
* @param file 文件
* @param response HTTP响应
* @throws IOException 异常
*/
public static void file2HttpResponse(File file, HttpServletResponse response) throws IOException {
response.setContentType("application/octet-stream");
@ -103,4 +78,24 @@ public class OmsFileUtils {
}
}
}
/**
* mongoDB 中的数据转存到本地文件中
* @param gridFsResource mongoDB 文件资源
* @param targetFile 本地文件资源
*/
public static void gridFs2File(GridFsResource gridFsResource, File targetFile) {
byte[] buffer = new byte[1024];
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))
) {
while (gis.read(buffer) != -1) {
bos.write(buffer);
}
bos.flush();
}catch (IOException ie) {
ExceptionUtils.rethrow(ie);
}
}
}

View File

@ -30,8 +30,8 @@ public class ContainerInfoDO {
// sourceType 决定JarFile -> String存储文件名称Git -> JSON包括 URLbranchusernamepassword
private String sourceInfo;
// 文件名称jar的MD5唯一作为 GridFS 的文件名
private String fileName;
// jar的MD5唯一作为 GridFS 的文件名
private String md5;
// 状态枚举值为 ContainerStatus
private Integer status;

View File

@ -15,4 +15,6 @@ public interface ContainerInfoRepository extends JpaRepository<ContainerInfoDO,
List<ContainerInfoDO> findByAppId(Long appId);
ContainerInfoDO findByContainerName(String containerName);
}

View File

@ -0,0 +1,73 @@
package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.common.utils.CommonUtils;
import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.gridfs.GridFsResource;
import org.springframework.data.mongodb.gridfs.GridFsTemplate;
import org.springframework.stereotype.Service;
import java.io.File;
/**
* 容器服务
*
* @author tjq
* @since 2020/5/16
*/
@Slf4j
@Service
public class ContainerService {
private GridFsTemplate gridFsTemplate;
/**
* 获取构建容器所需要的 Jar 文件
* @param md5 Jar文件的MD5值可以由此构建 mongoDB 文件名
* @return 本地Jar文件
*/
public File fetchContainerJarFile(String md5) {
String jarFileName = OmsFileUtils.genContainerJarPath() + genContainerJarName(md5);
File jarFile = new File(jarFileName);
if (jarFile.exists()) {
return jarFile;
}
if (gridFsTemplate != null) {
downloadJarFromMongoDB(genContainerJarName(md5), jarFile);
}
return jarFile;
}
private void downloadJarFromMongoDB(String mongoFileName, File targetFile) {
synchronized (mongoFileName.intern()) {
if (targetFile.exists()) {
return;
}
GridFsResource gridFsResource = gridFsTemplate.getResource(mongoFileName);
if (!gridFsResource.exists()) {
log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName);
return;
}
try {
OmsFileUtils.gridFs2File(gridFsResource, targetFile);
}catch (Exception e) {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile));
throw e;
}
}
}
private static String genContainerJarName(String md5) {
return String.format("oms-container-%s.jar", md5);
}
@Autowired(required = false)
public void setGridFsTemplate(GridFsTemplate gridFsTemplate) {
this.gridFsTemplate = gridFsTemplate;
}
}

View File

@ -15,6 +15,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;
@ -102,6 +103,7 @@ public class InstanceLogService {
public StringPage fetchInstanceLog(Long instanceId, long index) {
try {
Future<File> fileFuture = prepareLogFile(instanceId);
// 超时并不会打断正在执行的任务
File logFile = fileFuture.get(5, TimeUnit.SECONDS);
// 分页展示数据
@ -215,15 +217,19 @@ public class InstanceLogService {
if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < EXPIRE_INTERVAL_MS) {
return f;
}
try {
// 创建父文件夹文件在开流时自动会被创建
OmsFileUtils.forceMkdir4Parent(f);
FileUtils.forceMkdirParent(f);
// 重新构建文件
try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
stream2File(allLogStream, f);
}
return f;
}catch (Exception e) {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
throw new RuntimeException(e);
}
});
}
}
@ -232,13 +238,15 @@ public class InstanceLogService {
String path = genLogFilePath(instanceId, true);
synchronized (("stFileLock-" + instanceId).intern()) {
return localTransactionTemplate.execute(status -> {
File f = new File(path);
if (f.exists()) {
return f;
}
try {
// 创建父文件夹文件在开流时自动会被创建
OmsFileUtils.forceMkdir4Parent(f);
FileUtils.forceMkdirParent(f);
// 本地存在数据从本地持久化对应 SYNC 的情况
if (instanceId2LastReportTime.containsKey(instanceId)) {
@ -259,9 +267,13 @@ public class InstanceLogService {
OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
return f;
}
gridFs2File(gridFsResource, f);
OmsFileUtils.gridFs2File(gridFsResource, f);
}
return f;
}catch (Exception e) {
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
throw new RuntimeException(e);
}
});
}
}
@ -284,24 +296,6 @@ public class InstanceLogService {
}
}
/**
* 将MongoDB中存储的日志持久化为磁盘日志
* @param gridFsResource mongoDB 文件资源
* @param logFile 本地文件资源
*/
private void gridFs2File(GridFsResource gridFsResource, File logFile) {
byte[] buffer = new byte[1024];
try (BufferedInputStream gis = new BufferedInputStream(gridFsResource.getInputStream());
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(logFile))
) {
while (gis.read(buffer) != -1) {
bos.write(buffer);
}
bos.flush();
}catch (IOException ie) {
ExceptionUtils.rethrow(ie);
}
}
/**

View File

@ -6,11 +6,13 @@ import com.github.kfcfans.oms.server.common.utils.ContainerTemplateGenerator;
import com.github.kfcfans.oms.server.common.utils.OmsFileUtils;
import com.github.kfcfans.oms.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.oms.server.service.ContainerService;
import com.github.kfcfans.oms.server.web.request.GenerateContainerTemplateRequest;
import com.github.kfcfans.oms.server.web.request.SaveContainerInfoRequest;
import com.github.kfcfans.oms.server.web.response.ContainerInfoVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -43,9 +45,19 @@ public class ContainerController {
@Resource
private ContainerInfoRepository containerInfoRepository;
@PostMapping("/downloadContainerTemplate")
public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws Exception {
@Resource
private ContainerService containerService;
@GetMapping("/downloadJar")
public void downloadJar(String md5, HttpServletResponse response) throws IOException {
File file = containerService.fetchContainerJarFile(md5);
if (file.exists()) {
OmsFileUtils.file2HttpResponse(file, response);
}
}
@PostMapping("/downloadContainerTemplate")
public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws IOException {
File zipFile = ContainerTemplateGenerator.generate(req.getGroup(), req.getArtifact(), req.getName(), req.getPackageName(), req.getJavaVersion());
OmsFileUtils.file2HttpResponse(zipFile, response);
}
@ -61,7 +73,7 @@ public class ContainerController {
String tmpFileName = UUID.randomUUID().toString() + ".jar";
tmpFileName = StringUtils.replace(tmpFileName, "-", "");
File jarFile = new File(path + tmpFileName);
OmsFileUtils.forceMkdir4Parent(jarFile);
FileUtils.forceMkdirParent(jarFile);
file.transferTo(jarFile);
log.debug("[ContainerController] upload jarFile({}) to local disk success.", tmpFileName);
@ -69,16 +81,16 @@ public class ContainerController {
// 2. 检查是否符合标准是否为Jar是否符合 template
// 3. 生成MD5
String realFileName;
String md5;
try(FileInputStream fis = new FileInputStream(jarFile)) {
realFileName = DigestUtils.md5DigestAsHex(fis);
md5 = DigestUtils.md5DigestAsHex(fis);
}
// 3. 推送到 mongoDB
if (gridFsTemplate != null) {
}
return ResultDTO.success(realFileName);
return ResultDTO.success(md5);
}
@PostMapping("/save")
@ -101,7 +113,7 @@ public class ContainerController {
if (request.getSourceType() == ContainerSourceType.Git) {
}else {
containerInfoDO.setFileName(request.getSourceInfo());
containerInfoDO.setMd5(request.getSourceInfo());
}
containerInfoRepository.saveAndFlush(containerInfoDO);

View File

@ -12,8 +12,8 @@ import java.util.Date;
*/
@Data
public class ContainerInfoVO {
private Long id;
private Long id;
// 所属的应用ID
private Long appId;
@ -24,8 +24,8 @@ public class ContainerInfoVO {
// sourceType 决定JarFile -> String存储文件名称Git -> JSON包括 URLbranchusernamepassword
private String sourceInfo;
// 文件名称jar的MD5唯一作为 GridFS 的文件名
private String fileName;
// jar的MD5唯一作为 GridFS 的文件名
private String md5;
// 状态枚举值为 ContainerStatus
private Integer status;

View File

@ -20,17 +20,15 @@
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>
<kryo.version>5.0.0-RC5</kryo.version>
<commons.io.version>2.6</commons.io.version>
</properties>
<dependencies>
<!-- Spring 编译期依赖 -->
<!-- Spring 依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<!-- oms-common -->
@ -68,12 +66,7 @@
<scope>test</scope>
</dependency>
<!-- commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
</dependencies>

View File

@ -1,17 +1,15 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTrackerPool;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq;
import lombok.extern.slf4j.Slf4j;
/**
* 普通计算节点处理来自 JobTracker 的请求
* 普通计算节点处理来自 TaskTracker 的请求
*
* @author tjq
* @since 2020/3/17
@ -33,7 +31,6 @@ public class ProcessorTrackerActor extends AbstractActor {
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
Long jobId = req.getInstanceInfo().getJobId();
Long instanceId = req.getInstanceInfo().getInstanceId();
// 创建 ProcessorTracker 一定能成功且每个任务实例只会创建一个 ProcessorTracker

View File

@ -1,10 +1,12 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.worker.container.OmsContainerFactory;
import lombok.extern.slf4j.Slf4j;
/**
* Worker节点Actor主要用于和服务器保持心跳
* Worker节点Actor接受服务器请求
*
* @author tjq
* @since 2020/3/24
@ -15,7 +17,12 @@ public class WorkerActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ServerDeployContainerRequest.class, this::onReceiveServerDeployContainerRequest)
.matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
.build();
}
private void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
OmsContainerFactory.deployContainer(request);
}
}

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.oms.worker.common.utils;
/**
* 文件工具类
*
* @author tjq
* @since 2020/5/16
*/
public class OmsWorkerFileUtils {
private static final String USER_HOME = System.getProperty("user.home", "oms");
private static final String WORKER_DIR = USER_HOME + "/oms/";
public static String getScriptDir() {
return WORKER_DIR + "script/";
}
public static String getContainerDir() {
return WORKER_DIR + "container/";
}
}

View File

@ -16,4 +16,12 @@ public interface OmsContainer extends LifeCycle {
* @return 处理器可以是 MRBD等处理器
*/
BasicProcessor getProcessor(String className);
String getName();
String getMd5();
/**
* 尝试释放容器资源
*/
void tryRelease();
}

View File

@ -1,8 +1,25 @@
package com.github.kfcfans.oms.worker.container;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.oms.common.RemoteConstant;
import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.oms.common.request.http.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.oms.common.response.AskResponse;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.util.StringUtils;
import java.io.File;
import java.net.URL;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* 容器工厂
@ -10,8 +27,86 @@ import java.util.Map;
* @author tjq
* @since 2020/5/16
*/
@Slf4j
public class OmsContainerFactory {
private static final Map<Long, OmsContainer> CARGO = Maps.newConcurrentMap();
private static final Map<String, OmsContainer> CARGO = Maps.newConcurrentMap();
/**
* 获取容器
* @param name 容器名称
* @return 容器示例可能为 null
*/
public static OmsContainer getContainer(String name) {
OmsContainer omsContainer = CARGO.get(name);
if (omsContainer != null) {
return omsContainer;
}
// 尝试下载
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(name);
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
if (StringUtils.isEmpty(serverPath)) {
return null;
}
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (askResponse.isSuccess()) {
ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);
deployContainer(deployRequest);
}
}catch (Exception e) {
log.error("[OmsContainer] get container(name={}) failed.", name, e);
}
return CARGO.get(name);
}
/**
* 部署容器整个过程串行进行问题不大
* @param request 部署容器请求
*/
public static synchronized void deployContainer(ServerDeployContainerRequest request) {
String containerName = request.getContainerName();
String md5 = request.getMd5();
OmsContainer oldContainer = CARGO.get(containerName);
if (oldContainer != null && md5.equals(oldContainer.getMd5())) {
log.info("[OmsContainerFactory] container(name={},md5={}) already deployed.", containerName, md5);
return;
}
try {
// 下载Container到本地
String filePath = OmsWorkerFileUtils.getContainerDir() + containerName + "/" + md5 + ".jar";
File jarFile = new File(filePath);
FileUtils.forceMkdirParent(jarFile);
FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
// 创建新容器
OmsContainer newContainer = new OmsJarContainer(containerName, md5, jarFile);
newContainer.init();
// 替换容器
CARGO.put(containerName, newContainer);
log.info("[OmsContainerFactory] container(name={},md5={}) deployed successfully.", containerName, md5);
if (oldContainer != null) {
// 销毁旧容器
oldContainer.destroy();
}
}catch (Exception e) {
log.error("[OmsContainerFactory] deploy container(name={},md5={}) failed.", containerName, md5, e);
}
}
}

View File

@ -1,11 +1,11 @@
package com.github.kfcfans.oms.worker.container;
import com.github.kfcfans.oms.common.ContainerConstant;
import com.github.kfcfans.oms.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.common.OmsWorkerException;
import com.github.kfcfans.oms.worker.core.classloader.OhMyClassLoader;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@ -16,6 +16,7 @@ import java.io.InputStream;
import java.net.URL;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
/**
* OMS 容器实现
@ -26,26 +27,28 @@ import java.util.Properties;
@Slf4j
public class OmsJarContainer implements OmsContainer {
@Getter
private final Long id;
private final String name;
private final String localJarPath;
private final String md5;
private final File localJarFile;
// 引用计数器
private final AtomicInteger referenceCount = new AtomicInteger(0);
private OhMyClassLoader containerClassLoader;
private ClassPathXmlApplicationContext container;
private Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap();
public OmsJarContainer(Long containerId, String containerName, String localJarPath) {
this.id = containerId;
this.name = containerName;
this.localJarPath = localJarPath;
public OmsJarContainer(String name, String md5, File localJarFile) {
this.name = name;
this.md5 = md5;
this.localJarFile = localJarFile;
}
@Override
public BasicProcessor getProcessor(String className) {
return processorCache.computeIfAbsent(className, ignore -> {
BasicProcessor basicProcessor = processorCache.computeIfAbsent(className, ignore -> {
Class<?> targetClass;
try {
targetClass = containerClassLoader.loadClass(className);
@ -62,6 +65,9 @@ public class OmsJarContainer implements OmsContainer {
} catch (ClassCastException cce) {
log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", name, className);
return null;
} catch (Exception e) {
log.error("[OmsJarContainer-{}] get bean failed for {}.", name, className, e);
return null;
}
// 直接实例化
@ -75,15 +81,20 @@ public class OmsJarContainer implements OmsContainer {
}
return null;
});
if (basicProcessor != null) {
// 引用计数 + 1
referenceCount.getAndIncrement();
}
return basicProcessor;
}
@Override
public void init() throws Exception {
log.info("[OmsJarContainer] start to init container(id={},name={},jarPath={})", id, name, localJarPath);
log.info("[OmsJarContainer] start to init container(name={},jarPath={})", name, localJarFile.getPath());
File file = new File(localJarPath);
URL jarURL = file.toURI().toURL();
URL jarURL = localJarFile.toURI().toURL();
// 创建类加载器
this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());
@ -93,11 +104,11 @@ public class OmsJarContainer implements OmsContainer {
URL springXmlURL = containerClassLoader.getResource(ContainerConstant.SPRING_CONTEXT_FILE_NAME);
if (propertiesURL == null) {
log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarPath);
log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
throw new OmsWorkerException("invalid jar file");
}
if (springXmlURL == null) {
log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarPath);
log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarFile.getPath());
throw new OmsWorkerException("invalid jar file");
}
@ -121,11 +132,14 @@ public class OmsJarContainer implements OmsContainer {
this.container.setClassLoader(containerClassLoader);
this.container.refresh();
log.info("[OmsJarContainer] init container(id={},name={},jarPath={}) successfully", id, name, localJarPath);
log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", name, localJarFile.getPath());
}
@Override
public void destroy() throws Exception {
// 没有其余引用时才允许执行 destroy
if (referenceCount.get() <= 0) {
try {
processorCache.clear();
container.close();
@ -134,5 +148,35 @@ public class OmsJarContainer implements OmsContainer {
}catch (Exception e) {
log.error("[OmsJarContainer-{}] container destroyed failed", name, e);
}
return;
}
log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", name, referenceCount.get());
}
@Override
public String getName() {
return name;
}
@Override
public String getMd5() {
return md5;
}
@Override
public void tryRelease() {
log.debug("[OmsJarContainer-{}] tryRelease, current reference is {}.", name, referenceCount.get());
// 需要满足的条件引用计数器减为0 & 有更新的容器出现
if (referenceCount.decrementAndGet() <= 0) {
OmsContainer container = OmsContainerFactory.getContainer(name);
if (container != this) {
try {
destroy();
}catch (Exception ignore) {
}
}
}
}
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.core.processor.built;
import com.github.kfcfans.oms.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
@ -30,13 +31,12 @@ public abstract class ScriptProcessor implements BasicProcessor {
private final long timeout;
private final ExecutorService threadPool;
private static final String USER_HOME = System.getProperty("user.home", "oms");
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
public ScriptProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception {
this.instanceId = instanceId;
this.scriptPath = USER_HOME + "/oms/script/" + genScriptName(instanceId);
this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(instanceId);
this.timeout = timeout;
this.threadPool = pool;

View File

@ -10,6 +10,8 @@ import com.github.kfcfans.oms.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.container.OmsContainer;
import com.github.kfcfans.oms.worker.container.OmsContainerFactory;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor;
@ -47,6 +49,8 @@ public class ProcessorTracker {
// 任务执行器
private BasicProcessor processor;
// 容器可能为空
private OmsContainer omsContainer;
// 在线日志
private OmsLogger omsLogger;
@ -148,13 +152,17 @@ public class ProcessorTracker {
*/
public void destroy() {
// 0. 移除Container引用
if (omsContainer != null) {
omsContainer.tryRelease();
}
// 1. 关闭执行执行线程池
CommonUtils.executeIgnoreException(() -> {
List<Runnable> tasks = threadPool.shutdownNow();
if (!CollectionUtils.isEmpty(tasks)) {
log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", instanceId, tasks.size());
}
return null;
});
// 2. 去除顶层引用送入GC世界
@ -255,6 +263,13 @@ public class ProcessorTracker {
case PYTHON:
processor = new PythonProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS(), threadPool);
break;
case JAVA_CONTAINER:
String[] split = processorInfo.split("#");
omsContainer = OmsContainerFactory.getContainer(split[0]);
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
}
break;
default:
log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType);
throw new IllegalArgumentException("unknown processor type of " + processorType);