From bfb9c68590d0f993751bf4a3bbe947c5d8e1980e Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 21 Jan 2023 22:37:18 +0800 Subject: [PATCH] feat: close remoteEngine when jvm exit --- .../remote/framework/engine/RemoteEngine.java | 4 +- .../engine/impl/PowerJobRemoteEngine.java | 9 +++-- powerjob-server/pom.xml | 7 ++++ .../core/handler/IWorkerRequestHandler.java | 3 +- .../server/remote/actoes/ServerActor.java | 10 ----- .../server/remote/server/FriendActor.java | 6 +-- .../impl/PowerTransportService.java | 39 ++++++++++++++++--- 7 files changed, 53 insertions(+), 25 deletions(-) delete mode 100644 powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java index a591f2a9..e86d2722 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/RemoteEngine.java @@ -1,5 +1,7 @@ package tech.powerjob.remote.framework.engine; +import java.io.IOException; + /** * RemoteEngine * @@ -10,5 +12,5 @@ public interface RemoteEngine { EngineOutput start(EngineConfig engineConfig); - void close(); + void close() throws IOException; } diff --git a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java index 5292d4e7..dc09fdf5 100644 --- a/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java +++ b/powerjob-remote/powerjob-remote-framework/src/main/java/tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java @@ -10,6 +10,7 @@ import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.transporter.Transporter; +import java.io.IOException; import java.util.List; /** @@ -21,6 +22,8 @@ import java.util.List; @Slf4j public class PowerJobRemoteEngine implements RemoteEngine { + private CSInitializer csInitializer; + @Override public EngineOutput start(EngineConfig engineConfig) { @@ -28,7 +31,7 @@ public class PowerJobRemoteEngine implements RemoteEngine { log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig); List actorInfos = ActorFactory.load(engineConfig.getActorList()); - CSInitializer csInitializer = CSInitializerFactory.build(engineConfig.getType()); + csInitializer = CSInitializerFactory.build(engineConfig.getType()); String type = csInitializer.type(); @@ -53,7 +56,7 @@ public class PowerJobRemoteEngine implements RemoteEngine { } @Override - public void close() { - + public void close() throws IOException { + csInitializer.close(); } } diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index b9cc9460..c0aa342a 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -49,6 +49,8 @@ 3.0.10 9.1.6 + + 4.2.1 4.2.1 4.2.1 @@ -101,6 +103,11 @@ + + tech.powerjob + powerjob-common + ${powerjob-common.version} + tech.powerjob powerjob-remote-impl-http diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java index a5231685..9aac2484 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java @@ -4,7 +4,6 @@ import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.ProcessType; -import tech.powerjob.server.remote.actoes.ServerActor; import java.util.Optional; @@ -16,7 +15,7 @@ import static tech.powerjob.common.RemoteConstant.*; * @author tjq * @since 2022/9/10 */ -public interface IWorkerRequestHandler extends ServerActor { +public interface IWorkerRequestHandler { /** * 处理 worker 上报的心跳信息 diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java deleted file mode 100644 index 04f6fa60..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java +++ /dev/null @@ -1,10 +0,0 @@ -package tech.powerjob.server.remote.actoes; - -/** - * ServerActor 声明接口 - * - * @author tjq - * @since 2023/1/21 - */ -public interface ServerActor { -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java index 5e3ceeb2..e6d066cc 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java @@ -5,9 +5,9 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.stereotype.Component; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.ProcessType; -import tech.powerjob.server.remote.actoes.ServerActor; import tech.powerjob.server.remote.server.election.Ping; import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; @@ -22,8 +22,8 @@ import static tech.powerjob.common.RemoteConstant.*; */ @Slf4j @Component -@Handler(path = S4S_PATH) -public class FriendActor implements ServerActor { +@Actor(path = S4S_PATH) +public class FriendActor { private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA=="; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java index 8654f76d..26dad376 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transporter/impl/PowerTransportService.java @@ -5,14 +5,19 @@ import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.ServerType; @@ -21,7 +26,6 @@ import tech.powerjob.remote.framework.engine.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; -import tech.powerjob.server.remote.actoes.ServerActor; import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.TransportService; @@ -37,20 +41,22 @@ import java.util.concurrent.CompletionStage; */ @Slf4j @Service -public class PowerTransportService implements TransportService, InitializingBean { +public class PowerTransportService implements TransportService, InitializingBean, DisposableBean, ApplicationContextAware { @Value("${oms.transporter.active.protocols}") private String activeProtocols; private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port"; private final Environment environment; - private final List serverActors; private ProtocolInfo defaultProtocol; private final Map protocol2Transporter = Maps.newHashMap(); - public PowerTransportService(List serverActors, Environment environment) { - this.serverActors = serverActors; + private final List engines = Lists.newArrayList(); + + private ApplicationContext applicationContext; + + public PowerTransportService(Environment environment) { this.environment = environment; } @@ -80,6 +86,11 @@ public class PowerTransportService implements TransportService, InitializingBean } private void initRemoteFrameWork(String protocol, int port) { + + // 从构造器注入改为从 applicationContext 获取来避免循环依赖 + final Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class); + log.info("[PowerTransportService] find Actor num={},names={}", beansWithAnnotation.size(), beansWithAnnotation.keySet()); + Address address = new Address() .setHost(NetUtils.getLocalHost()) .setPort(port); @@ -87,12 +98,13 @@ public class PowerTransportService implements TransportService, InitializingBean .setServerType(ServerType.SERVER) .setType(protocol.toUpperCase()) .setBindAddress(address) - .setActorList(Lists.newArrayList(serverActors)); + .setActorList(Lists.newArrayList(beansWithAnnotation.values())); log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address); RemoteEngine re = new PowerJobRemoteEngine(); final EngineOutput engineOutput = re.start(engineConfig); log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); + this.engines.add(re); this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter())); } @@ -168,4 +180,19 @@ public class PowerTransportService implements TransportService, InitializingBean throw new IllegalArgumentException("can't find default protocol, please check your config!"); } } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + public void destroy() throws Exception { + engines.forEach(e -> { + try { + e.close(); + } catch (Exception ignore) { + } + }); + } }