From 21d5d4498fbc2049c312c121f0b51fadb4c10a03 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 20 May 2020 23:05:40 +0800 Subject: [PATCH] [fix] change container's unique to id (name can change, so name can't be the uk) --- .../WorkerNeedDeployContainerRequest.java | 2 +- .../oms/server/akka/actors/ServerActor.java | 2 +- .../oms/server/service/ContainerService.java | 12 +++---- .../ContainerDeployServerEndpoint.java | 12 +++---- .../worker/container/OmsContainerFactory.java | 29 ++++++++-------- .../oms/worker/container/OmsJarContainer.java | 34 +++++++++---------- .../tracker/processor/ProcessorTracker.java | 2 +- 7 files changed, 47 insertions(+), 46 deletions(-) diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java index 41d4a5d0..48f9f5d3 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/WorkerNeedDeployContainerRequest.java @@ -15,5 +15,5 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AllArgsConstructor public class WorkerNeedDeployContainerRequest implements OmsSerializable { - private String containerName; + private Long containerId; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java index 4e446b62..8a243027 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java @@ -87,7 +87,7 @@ public class ServerActor extends AbstractActor { Environment environment = SpringUtils.getBean(Environment.class); String port = environment.getProperty("local.server.port"); - Optional containerInfoOpt = containerInfoRepository.findByContainerName(req.getContainerName()); + Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); AskResponse askResponse = new AskResponse(); askResponse.setSuccess(false); if (containerInfoOpt.isPresent()) { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java index 59fea182..fc43e306 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java @@ -185,13 +185,13 @@ public class ContainerService { /** * 部署容器 - * @param containerName 容器名称 + * @param containerId 容器ID * @param session WebSocket Session * @throws Exception 异常 */ - public void deploy(String containerName, Session session) throws Exception { + public void deploy(Long containerId, Session session) throws Exception { - String deployLock = "containerDeployLock-" + containerName; + String deployLock = "containerDeployLock-" + containerId; RemoteEndpoint.Async remote = session.getAsyncRemote(); // 最长部署时间:10分钟 boolean lock = lockService.lock(deployLock, 10 * 60 * 1000); @@ -202,9 +202,9 @@ public class ContainerService { try { - Optional containerInfoOpt = containerInfoRepository.findByContainerName(containerName); + Optional containerInfoOpt = containerInfoRepository.findById(containerId); if (!containerInfoOpt.isPresent()) { - remote.sendText("SYSTEM: can't find container by name: " + containerName); + remote.sendText("SYSTEM: can't find container by id: " + containerId); return; } ContainerInfoDO container = containerInfoOpt.get(); @@ -240,7 +240,7 @@ public class ContainerService { String port = environment.getProperty("local.server.port"); String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, container.getVersion()); - ServerDeployContainerRequest req = new ServerDeployContainerRequest(container.getId(), containerName, container.getVersion(), downloadURL); + ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerId, container.getContainerName(), container.getVersion(), downloadURL); long sleepTime = calculateSleepTime(jarFile.length()); AtomicInteger count = new AtomicInteger(); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java index bf0ec691..10a65746 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/websocket/ContainerDeployServerEndpoint.java @@ -21,21 +21,21 @@ import java.io.IOException; */ @Slf4j @Component -@ServerEndpoint(value = "/container/deploy/{name}", configurator = OmsEndpointConfigure.class) +@ServerEndpoint(value = "/container/deploy/{id}", configurator = OmsEndpointConfigure.class) public class ContainerDeployServerEndpoint { @Resource private ContainerService containerService; @OnOpen - public void onOpen(@PathParam("name") String name, Session session) { + public void onOpen(@PathParam("id") Long id, Session session) { RemoteEndpoint.Async remote = session.getAsyncRemote(); - remote.sendText("SYSTEM: connected successfully, start to deploy container: " + name); + remote.sendText("SYSTEM: connected successfully, start to deploy container: " + id); try { - containerService.deploy(name, session); + containerService.deploy(id, session); }catch (Exception e) { - log.error("[ContainerDeployServerEndpoint] deploy container {} failed.", name, e); + log.error("[ContainerDeployServerEndpoint] deploy container {} failed.", id, e); remote.sendText("SYSTEM: deploy failed because of the exception"); remote.sendText(ExceptionUtils.getStackTrace(e)); @@ -43,7 +43,7 @@ public class ContainerDeployServerEndpoint { try { session.close(); }catch (Exception e) { - log.error("[ContainerDeployServerEndpoint] close session for {} failed.", name, e); + log.error("[ContainerDeployServerEndpoint] close session for {} failed.", id, e); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java index 75873a3e..c6b14d3d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainerFactory.java @@ -33,22 +33,22 @@ import java.util.concurrent.TimeUnit; @Slf4j public class OmsContainerFactory { - private static final Map CARGO = Maps.newConcurrentMap(); + private static final Map CARGO = Maps.newConcurrentMap(); /** * 获取容器 - * @param name 容器名称 + * @param containerId 容器ID * @return 容器示例,可能为 null */ - public static OmsContainer getContainer(String name) { + public static OmsContainer getContainer(Long containerId) { - OmsContainer omsContainer = CARGO.get(name); + OmsContainer omsContainer = CARGO.get(containerId); if (omsContainer != null) { return omsContainer; } // 尝试下载 - WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(name); + WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); if (StringUtils.isEmpty(serverPath)) { @@ -65,10 +65,10 @@ public class OmsContainerFactory { deployContainer(deployRequest); } }catch (Exception e) { - log.error("[OmsContainer] get container(name={}) failed.", name, e); + log.error("[OmsContainerFactory] get container(id={}) failed.", containerId, e); } - return CARGO.get(name); + return CARGO.get(containerId); } @@ -78,33 +78,34 @@ public class OmsContainerFactory { */ public static synchronized void deployContainer(ServerDeployContainerRequest request) { + Long containerId = request.getContainerId(); String containerName = request.getContainerName(); String version = request.getVersion(); - OmsContainer oldContainer = CARGO.get(containerName); + OmsContainer oldContainer = CARGO.get(containerId); if (oldContainer != null && version.equals(oldContainer.getVersion())) { - log.info("[OmsContainerFactory] container(name={},version={}) already deployed.", containerName, version); + log.info("[OmsContainerFactory] container(id={},version={}) already deployed.", containerId, version); return; } try { // 下载Container到本地 - String filePath = OmsWorkerFileUtils.getContainerDir() + containerName + "/" + version + ".jar"; + String filePath = OmsWorkerFileUtils.getContainerDir() + containerId + "/" + version + ".jar"; File jarFile = new File(filePath); if (!jarFile.exists()) { FileUtils.forceMkdirParent(jarFile); FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000); - log.info("[OmsContainerFactory] download Jar for container({}) successfully.", containerName); + log.info("[OmsContainerFactory] download Jar for container(id={}) successfully.", containerId); } // 创建新容器 - OmsContainer newContainer = new OmsJarContainer(request.getContainerId(), containerName, version, jarFile); + OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile); newContainer.init(); // 替换容器 - CARGO.put(containerName, newContainer); - log.info("[OmsContainerFactory] container(name={},version={}) deployed successfully.", containerName, version); + CARGO.put(containerId, newContainer); + log.info("[OmsContainerFactory] container(id={},name={},version={}) deployed successfully.", containerId, containerName, version); if (oldContainer != null) { // 销毁旧容器 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java index 029d900e..e375f47e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java @@ -56,7 +56,7 @@ public class OmsJarContainer implements OmsContainer { try { targetClass = containerClassLoader.loadClass(className); } catch (ClassNotFoundException cnf) { - log.error("[OmsJarContainer-{}] can't find class: {} in container.", name, className); + log.error("[OmsJarContainer-{}] can't find class: {} in container.", containerId, className); return null; } @@ -64,12 +64,12 @@ public class OmsJarContainer implements OmsContainer { try { return (BasicProcessor) container.getBean(targetClass); } catch (BeansException be) { - log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", name); + log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", containerId); } catch (ClassCastException cce) { - log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", name, className); + log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", containerId, className); return null; } catch (Exception e) { - log.error("[OmsJarContainer-{}] get bean failed for {}.", name, className, e); + log.error("[OmsJarContainer-{}] get bean failed for {}.", containerId, className, e); return null; } @@ -78,7 +78,7 @@ public class OmsJarContainer implements OmsContainer { Object obj = targetClass.getDeclaredConstructor().newInstance(); return (BasicProcessor) obj; } catch (Exception e) { - log.error("[OmsJarContainer-{}] load {} failed", name, className, e); + log.error("[OmsJarContainer-{}] load {} failed", containerId, className, e); } return null; }); @@ -93,7 +93,7 @@ public class OmsJarContainer implements OmsContainer { @Override public void init() throws Exception { - log.info("[OmsJarContainer] start to init container(name={},jarPath={})", name, localJarFile.getPath()); + log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath()); URL jarURL = localJarFile.toURI().toURL(); @@ -105,11 +105,11 @@ public class OmsJarContainer implements OmsContainer { URL springXmlURL = containerClassLoader.getResource(ContainerConstant.SPRING_CONTEXT_FILE_NAME); if (propertiesURL == null) { - log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); + log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath()); throw new OmsWorkerException("invalid jar file"); } if (springXmlURL == null) { - log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarFile.getPath()); + log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarFile.getPath()); throw new OmsWorkerException("invalid jar file"); } @@ -117,11 +117,11 @@ public class OmsJarContainer implements OmsContainer { Properties properties = new Properties(); try (InputStream is = propertiesURL.openStream()) { properties.load(is); - log.info("[OmsJarContainer] load container properties successfully: {}", properties); + log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties); } String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY); if (StringUtils.isEmpty(packageName)) { - log.error("[OmsJarContainer] get package name failed, developer should't modify the properties file!"); + log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId); throw new OmsWorkerException("invalid jar file"); } @@ -133,7 +133,7 @@ public class OmsJarContainer implements OmsContainer { this.container.setClassLoader(containerClassLoader); this.container.refresh(); - log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", name, localJarFile.getPath()); + log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", containerId, localJarFile.getPath()); } @Override @@ -146,20 +146,20 @@ public class OmsJarContainer implements OmsContainer { FileUtils.forceDelete(localJarFile); } }catch (Exception e) { - log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", name, localJarFile.getPath(), e); + log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", containerId, localJarFile.getPath(), e); } try { processorCache.clear(); container.close(); containerClassLoader.close(); - log.info("[OmsJarContainer-{}] container destroyed successfully", name); + log.info("[OmsJarContainer-{}] container destroyed successfully", containerId); }catch (Exception e) { - log.error("[OmsJarContainer-{}] container destroyed failed", name, e); + log.error("[OmsJarContainer-{}] container destroyed failed", containerId, e); } return; } - log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", name, referenceCount.get()); + log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", containerId, referenceCount.get()); } @Override @@ -186,11 +186,11 @@ public class OmsJarContainer implements OmsContainer { @Override public void tryRelease() { - log.debug("[OmsJarContainer-{}] tryRelease, current reference is {}.", name, referenceCount.get()); + log.debug("[OmsJarContainer-{}] tryRelease, current reference is {}.", containerId, referenceCount.get()); // 需要满足的条件:引用计数器减为0 & 有更新的容器出现 if (referenceCount.decrementAndGet() <= 0) { - OmsContainer container = OmsContainerFactory.getContainer(name); + OmsContainer container = OmsContainerFactory.getContainer(containerId); if (container != this) { try { destroy(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 7a262ed0..45f2bc8e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -266,7 +266,7 @@ public class ProcessorTracker { break; case JAVA_CONTAINER: String[] split = processorInfo.split("#"); - omsContainer = OmsContainerFactory.getContainer(split[0]); + omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0])); if (omsContainer != null) { processor = omsContainer.getProcessor(split[1]); }