mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[fix] change container's unique to id (name can change, so name can't be the uk)
This commit is contained in:
parent
f1d471d3a1
commit
21d5d4498f
@ -15,5 +15,5 @@ import lombok.NoArgsConstructor;
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class WorkerNeedDeployContainerRequest implements OmsSerializable {
|
||||
private String containerName;
|
||||
private Long containerId;
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ public class ServerActor extends AbstractActor {
|
||||
Environment environment = SpringUtils.getBean(Environment.class);
|
||||
String port = environment.getProperty("local.server.port");
|
||||
|
||||
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findByContainerName(req.getContainerName());
|
||||
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
|
||||
AskResponse askResponse = new AskResponse();
|
||||
askResponse.setSuccess(false);
|
||||
if (containerInfoOpt.isPresent()) {
|
||||
|
@ -185,13 +185,13 @@ public class ContainerService {
|
||||
|
||||
/**
|
||||
* 部署容器
|
||||
* @param containerName 容器名称
|
||||
* @param containerId 容器ID
|
||||
* @param session WebSocket Session
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public void deploy(String containerName, Session session) throws Exception {
|
||||
public void deploy(Long containerId, Session session) throws Exception {
|
||||
|
||||
String deployLock = "containerDeployLock-" + containerName;
|
||||
String deployLock = "containerDeployLock-" + containerId;
|
||||
RemoteEndpoint.Async remote = session.getAsyncRemote();
|
||||
// 最长部署时间:10分钟
|
||||
boolean lock = lockService.lock(deployLock, 10 * 60 * 1000);
|
||||
@ -202,9 +202,9 @@ public class ContainerService {
|
||||
|
||||
try {
|
||||
|
||||
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findByContainerName(containerName);
|
||||
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(containerId);
|
||||
if (!containerInfoOpt.isPresent()) {
|
||||
remote.sendText("SYSTEM: can't find container by name: " + containerName);
|
||||
remote.sendText("SYSTEM: can't find container by id: " + containerId);
|
||||
return;
|
||||
}
|
||||
ContainerInfoDO container = containerInfoOpt.get();
|
||||
@ -240,7 +240,7 @@ public class ContainerService {
|
||||
|
||||
String port = environment.getProperty("local.server.port");
|
||||
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, container.getVersion());
|
||||
ServerDeployContainerRequest req = new ServerDeployContainerRequest(container.getId(), containerName, container.getVersion(), downloadURL);
|
||||
ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerId, container.getContainerName(), container.getVersion(), downloadURL);
|
||||
long sleepTime = calculateSleepTime(jarFile.length());
|
||||
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
|
@ -21,21 +21,21 @@ import java.io.IOException;
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ServerEndpoint(value = "/container/deploy/{name}", configurator = OmsEndpointConfigure.class)
|
||||
@ServerEndpoint(value = "/container/deploy/{id}", configurator = OmsEndpointConfigure.class)
|
||||
public class ContainerDeployServerEndpoint {
|
||||
|
||||
@Resource
|
||||
private ContainerService containerService;
|
||||
|
||||
@OnOpen
|
||||
public void onOpen(@PathParam("name") String name, Session session) {
|
||||
public void onOpen(@PathParam("id") Long id, Session session) {
|
||||
|
||||
RemoteEndpoint.Async remote = session.getAsyncRemote();
|
||||
remote.sendText("SYSTEM: connected successfully, start to deploy container: " + name);
|
||||
remote.sendText("SYSTEM: connected successfully, start to deploy container: " + id);
|
||||
try {
|
||||
containerService.deploy(name, session);
|
||||
containerService.deploy(id, session);
|
||||
}catch (Exception e) {
|
||||
log.error("[ContainerDeployServerEndpoint] deploy container {} failed.", name, e);
|
||||
log.error("[ContainerDeployServerEndpoint] deploy container {} failed.", id, e);
|
||||
|
||||
remote.sendText("SYSTEM: deploy failed because of the exception");
|
||||
remote.sendText(ExceptionUtils.getStackTrace(e));
|
||||
@ -43,7 +43,7 @@ public class ContainerDeployServerEndpoint {
|
||||
try {
|
||||
session.close();
|
||||
}catch (Exception e) {
|
||||
log.error("[ContainerDeployServerEndpoint] close session for {} failed.", name, e);
|
||||
log.error("[ContainerDeployServerEndpoint] close session for {} failed.", id, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,22 +33,22 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
public class OmsContainerFactory {
|
||||
|
||||
private static final Map<String, OmsContainer> CARGO = Maps.newConcurrentMap();
|
||||
private static final Map<Long, OmsContainer> CARGO = Maps.newConcurrentMap();
|
||||
|
||||
/**
|
||||
* 获取容器
|
||||
* @param name 容器名称
|
||||
* @param containerId 容器ID
|
||||
* @return 容器示例,可能为 null
|
||||
*/
|
||||
public static OmsContainer getContainer(String name) {
|
||||
public static OmsContainer getContainer(Long containerId) {
|
||||
|
||||
OmsContainer omsContainer = CARGO.get(name);
|
||||
OmsContainer omsContainer = CARGO.get(containerId);
|
||||
if (omsContainer != null) {
|
||||
return omsContainer;
|
||||
}
|
||||
|
||||
// 尝试下载
|
||||
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(name);
|
||||
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
|
||||
|
||||
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
|
||||
if (StringUtils.isEmpty(serverPath)) {
|
||||
@ -65,10 +65,10 @@ public class OmsContainerFactory {
|
||||
deployContainer(deployRequest);
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.error("[OmsContainer] get container(name={}) failed.", name, e);
|
||||
log.error("[OmsContainerFactory] get container(id={}) failed.", containerId, e);
|
||||
}
|
||||
|
||||
return CARGO.get(name);
|
||||
return CARGO.get(containerId);
|
||||
}
|
||||
|
||||
|
||||
@ -78,33 +78,34 @@ public class OmsContainerFactory {
|
||||
*/
|
||||
public static synchronized void deployContainer(ServerDeployContainerRequest request) {
|
||||
|
||||
Long containerId = request.getContainerId();
|
||||
String containerName = request.getContainerName();
|
||||
String version = request.getVersion();
|
||||
|
||||
OmsContainer oldContainer = CARGO.get(containerName);
|
||||
OmsContainer oldContainer = CARGO.get(containerId);
|
||||
if (oldContainer != null && version.equals(oldContainer.getVersion())) {
|
||||
log.info("[OmsContainerFactory] container(name={},version={}) already deployed.", containerName, version);
|
||||
log.info("[OmsContainerFactory] container(id={},version={}) already deployed.", containerId, version);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
// 下载Container到本地
|
||||
String filePath = OmsWorkerFileUtils.getContainerDir() + containerName + "/" + version + ".jar";
|
||||
String filePath = OmsWorkerFileUtils.getContainerDir() + containerId + "/" + version + ".jar";
|
||||
File jarFile = new File(filePath);
|
||||
if (!jarFile.exists()) {
|
||||
FileUtils.forceMkdirParent(jarFile);
|
||||
FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
|
||||
log.info("[OmsContainerFactory] download Jar for container({}) successfully.", containerName);
|
||||
log.info("[OmsContainerFactory] download Jar for container(id={}) successfully.", containerId);
|
||||
}
|
||||
|
||||
// 创建新容器
|
||||
OmsContainer newContainer = new OmsJarContainer(request.getContainerId(), containerName, version, jarFile);
|
||||
OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);
|
||||
newContainer.init();
|
||||
|
||||
// 替换容器
|
||||
CARGO.put(containerName, newContainer);
|
||||
log.info("[OmsContainerFactory] container(name={},version={}) deployed successfully.", containerName, version);
|
||||
CARGO.put(containerId, newContainer);
|
||||
log.info("[OmsContainerFactory] container(id={},name={},version={}) deployed successfully.", containerId, containerName, version);
|
||||
|
||||
if (oldContainer != null) {
|
||||
// 销毁旧容器
|
||||
|
@ -56,7 +56,7 @@ public class OmsJarContainer implements OmsContainer {
|
||||
try {
|
||||
targetClass = containerClassLoader.loadClass(className);
|
||||
} catch (ClassNotFoundException cnf) {
|
||||
log.error("[OmsJarContainer-{}] can't find class: {} in container.", name, className);
|
||||
log.error("[OmsJarContainer-{}] can't find class: {} in container.", containerId, className);
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -64,12 +64,12 @@ public class OmsJarContainer implements OmsContainer {
|
||||
try {
|
||||
return (BasicProcessor) container.getBean(targetClass);
|
||||
} catch (BeansException be) {
|
||||
log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", name);
|
||||
log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", containerId);
|
||||
} catch (ClassCastException cce) {
|
||||
log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", name, className);
|
||||
log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", containerId, className);
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.error("[OmsJarContainer-{}] get bean failed for {}.", name, className, e);
|
||||
log.error("[OmsJarContainer-{}] get bean failed for {}.", containerId, className, e);
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -78,7 +78,7 @@ public class OmsJarContainer implements OmsContainer {
|
||||
Object obj = targetClass.getDeclaredConstructor().newInstance();
|
||||
return (BasicProcessor) obj;
|
||||
} catch (Exception e) {
|
||||
log.error("[OmsJarContainer-{}] load {} failed", name, className, e);
|
||||
log.error("[OmsJarContainer-{}] load {} failed", containerId, className, e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
@ -93,7 +93,7 @@ public class OmsJarContainer implements OmsContainer {
|
||||
@Override
|
||||
public void init() throws Exception {
|
||||
|
||||
log.info("[OmsJarContainer] start to init container(name={},jarPath={})", name, localJarFile.getPath());
|
||||
log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath());
|
||||
|
||||
URL jarURL = localJarFile.toURI().toURL();
|
||||
|
||||
@ -105,11 +105,11 @@ public class OmsJarContainer implements OmsContainer {
|
||||
URL springXmlURL = containerClassLoader.getResource(ContainerConstant.SPRING_CONTEXT_FILE_NAME);
|
||||
|
||||
if (propertiesURL == null) {
|
||||
log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
|
||||
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
|
||||
throw new OmsWorkerException("invalid jar file");
|
||||
}
|
||||
if (springXmlURL == null) {
|
||||
log.error("[OmsJarContainer] can't find {} in jar {}.", ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarFile.getPath());
|
||||
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.SPRING_CONTEXT_FILE_NAME, localJarFile.getPath());
|
||||
throw new OmsWorkerException("invalid jar file");
|
||||
}
|
||||
|
||||
@ -117,11 +117,11 @@ public class OmsJarContainer implements OmsContainer {
|
||||
Properties properties = new Properties();
|
||||
try (InputStream is = propertiesURL.openStream()) {
|
||||
properties.load(is);
|
||||
log.info("[OmsJarContainer] load container properties successfully: {}", properties);
|
||||
log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties);
|
||||
}
|
||||
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
|
||||
if (StringUtils.isEmpty(packageName)) {
|
||||
log.error("[OmsJarContainer] get package name failed, developer should't modify the properties file!");
|
||||
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
|
||||
throw new OmsWorkerException("invalid jar file");
|
||||
}
|
||||
|
||||
@ -133,7 +133,7 @@ public class OmsJarContainer implements OmsContainer {
|
||||
this.container.setClassLoader(containerClassLoader);
|
||||
this.container.refresh();
|
||||
|
||||
log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", name, localJarFile.getPath());
|
||||
log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", containerId, localJarFile.getPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -146,20 +146,20 @@ public class OmsJarContainer implements OmsContainer {
|
||||
FileUtils.forceDelete(localJarFile);
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", name, localJarFile.getPath(), e);
|
||||
log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", containerId, localJarFile.getPath(), e);
|
||||
}
|
||||
try {
|
||||
processorCache.clear();
|
||||
container.close();
|
||||
containerClassLoader.close();
|
||||
log.info("[OmsJarContainer-{}] container destroyed successfully", name);
|
||||
log.info("[OmsJarContainer-{}] container destroyed successfully", containerId);
|
||||
}catch (Exception e) {
|
||||
log.error("[OmsJarContainer-{}] container destroyed failed", name, e);
|
||||
log.error("[OmsJarContainer-{}] container destroyed failed", containerId, e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", name, referenceCount.get());
|
||||
log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", containerId, referenceCount.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -186,11 +186,11 @@ public class OmsJarContainer implements OmsContainer {
|
||||
@Override
|
||||
public void tryRelease() {
|
||||
|
||||
log.debug("[OmsJarContainer-{}] tryRelease, current reference is {}.", name, referenceCount.get());
|
||||
log.debug("[OmsJarContainer-{}] tryRelease, current reference is {}.", containerId, referenceCount.get());
|
||||
// 需要满足的条件:引用计数器减为0 & 有更新的容器出现
|
||||
if (referenceCount.decrementAndGet() <= 0) {
|
||||
|
||||
OmsContainer container = OmsContainerFactory.getContainer(name);
|
||||
OmsContainer container = OmsContainerFactory.getContainer(containerId);
|
||||
if (container != this) {
|
||||
try {
|
||||
destroy();
|
||||
|
@ -266,7 +266,7 @@ public class ProcessorTracker {
|
||||
break;
|
||||
case JAVA_CONTAINER:
|
||||
String[] split = processorInfo.split("#");
|
||||
omsContainer = OmsContainerFactory.getContainer(split[0]);
|
||||
omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0]));
|
||||
if (omsContainer != null) {
|
||||
processor = omsContainer.getProcessor(split[1]);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user