feat: close remoteEngine when jvm exit

This commit is contained in:
tjq 2023-01-21 22:37:18 +08:00
parent 25c6a9a6d6
commit bfb9c68590
7 changed files with 53 additions and 25 deletions

View File

@ -1,5 +1,7 @@
package tech.powerjob.remote.framework.engine; package tech.powerjob.remote.framework.engine;
import java.io.IOException;
/** /**
* RemoteEngine * RemoteEngine
* *
@ -10,5 +12,5 @@ public interface RemoteEngine {
EngineOutput start(EngineConfig engineConfig); EngineOutput start(EngineConfig engineConfig);
void close(); void close() throws IOException;
} }

View File

@ -10,6 +10,7 @@ import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
@ -21,6 +22,8 @@ import java.util.List;
@Slf4j @Slf4j
public class PowerJobRemoteEngine implements RemoteEngine { public class PowerJobRemoteEngine implements RemoteEngine {
private CSInitializer csInitializer;
@Override @Override
public EngineOutput start(EngineConfig engineConfig) { public EngineOutput start(EngineConfig engineConfig) {
@ -28,7 +31,7 @@ public class PowerJobRemoteEngine implements RemoteEngine {
log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig); log.info("[PowerJobRemoteEngine] start remote engine with config: {}", engineConfig);
List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList()); List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
CSInitializer csInitializer = CSInitializerFactory.build(engineConfig.getType()); csInitializer = CSInitializerFactory.build(engineConfig.getType());
String type = csInitializer.type(); String type = csInitializer.type();
@ -53,7 +56,7 @@ public class PowerJobRemoteEngine implements RemoteEngine {
} }
@Override @Override
public void close() { public void close() throws IOException {
csInitializer.close();
} }
} }

View File

@ -49,6 +49,8 @@
<groovy.version>3.0.10</groovy.version> <groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.1.6</cron-utils.version> <cron-utils.version>9.1.6</cron-utils.version>
<powerjob-common.version>4.2.1</powerjob-common.version>
<powerjob-remote-impl-http.version>4.2.1</powerjob-remote-impl-http.version> <powerjob-remote-impl-http.version>4.2.1</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.2.1</powerjob-remote-impl-akka.version> <powerjob-remote-impl-akka.version>4.2.1</powerjob-remote-impl-akka.version>
</properties> </properties>
@ -101,6 +103,11 @@
<dependencies> <dependencies>
<!-- 网络层 --> <!-- 网络层 -->
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-common</artifactId>
<version>${powerjob-common.version}</version>
</dependency>
<dependency> <dependency>
<groupId>tech.powerjob</groupId> <groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-impl-http</artifactId> <artifactId>powerjob-remote-impl-http</artifactId>

View File

@ -4,7 +4,6 @@ import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.remote.actoes.ServerActor;
import java.util.Optional; import java.util.Optional;
@ -16,7 +15,7 @@ import static tech.powerjob.common.RemoteConstant.*;
* @author tjq * @author tjq
* @since 2022/9/10 * @since 2022/9/10
*/ */
public interface IWorkerRequestHandler extends ServerActor { public interface IWorkerRequestHandler {
/** /**
* 处理 worker 上报的心跳信息 * 处理 worker 上报的心跳信息

View File

@ -1,10 +0,0 @@
package tech.powerjob.server.remote.actoes;
/**
* ServerActor 声明接口
*
* @author tjq
* @since 2023/1/21
*/
public interface ServerActor {
}

View File

@ -5,9 +5,9 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.remote.actoes.ServerActor;
import tech.powerjob.server.remote.server.election.Ping; import tech.powerjob.server.remote.server.election.Ping;
import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; import tech.powerjob.server.remote.server.redirector.RemoteProcessReq;
import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor;
@ -22,8 +22,8 @@ import static tech.powerjob.common.RemoteConstant.*;
*/ */
@Slf4j @Slf4j
@Component @Component
@Handler(path = S4S_PATH) @Actor(path = S4S_PATH)
public class FriendActor implements ServerActor { public class FriendActor {
private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA=="; private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA==";

View File

@ -5,14 +5,19 @@ import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.powerjob.common.OmsConstant; import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.base.ServerType;
@ -21,7 +26,6 @@ import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.server.remote.actoes.ServerActor;
import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.ProtocolInfo;
import tech.powerjob.server.remote.transporter.TransportService; import tech.powerjob.server.remote.transporter.TransportService;
@ -37,20 +41,22 @@ import java.util.concurrent.CompletionStage;
*/ */
@Slf4j @Slf4j
@Service @Service
public class PowerTransportService implements TransportService, InitializingBean { public class PowerTransportService implements TransportService, InitializingBean, DisposableBean, ApplicationContextAware {
@Value("${oms.transporter.active.protocols}") @Value("${oms.transporter.active.protocols}")
private String activeProtocols; private String activeProtocols;
private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port"; private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port";
private final Environment environment; private final Environment environment;
private final List<ServerActor> serverActors;
private ProtocolInfo defaultProtocol; private ProtocolInfo defaultProtocol;
private final Map<String, ProtocolInfo> protocol2Transporter = Maps.newHashMap(); private final Map<String, ProtocolInfo> protocol2Transporter = Maps.newHashMap();
public PowerTransportService(List<ServerActor> serverActors, Environment environment) { private final List<RemoteEngine> engines = Lists.newArrayList();
this.serverActors = serverActors;
private ApplicationContext applicationContext;
public PowerTransportService(Environment environment) {
this.environment = environment; this.environment = environment;
} }
@ -80,6 +86,11 @@ public class PowerTransportService implements TransportService, InitializingBean
} }
private void initRemoteFrameWork(String protocol, int port) { private void initRemoteFrameWork(String protocol, int port) {
// 从构造器注入改为从 applicationContext 获取来避免循环依赖
final Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class);
log.info("[PowerTransportService] find Actor num={},names={}", beansWithAnnotation.size(), beansWithAnnotation.keySet());
Address address = new Address() Address address = new Address()
.setHost(NetUtils.getLocalHost()) .setHost(NetUtils.getLocalHost())
.setPort(port); .setPort(port);
@ -87,12 +98,13 @@ public class PowerTransportService implements TransportService, InitializingBean
.setServerType(ServerType.SERVER) .setServerType(ServerType.SERVER)
.setType(protocol.toUpperCase()) .setType(protocol.toUpperCase())
.setBindAddress(address) .setBindAddress(address)
.setActorList(Lists.newArrayList(serverActors)); .setActorList(Lists.newArrayList(beansWithAnnotation.values()));
log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address); log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address);
RemoteEngine re = new PowerJobRemoteEngine(); RemoteEngine re = new PowerJobRemoteEngine();
final EngineOutput engineOutput = re.start(engineConfig); final EngineOutput engineOutput = re.start(engineConfig);
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);
this.engines.add(re);
this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter())); this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
} }
@ -168,4 +180,19 @@ public class PowerTransportService implements TransportService, InitializingBean
throw new IllegalArgumentException("can't find default protocol, please check your config!"); throw new IllegalArgumentException("can't find default protocol, please check your config!");
} }
} }
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void destroy() throws Exception {
engines.forEach(e -> {
try {
e.close();
} catch (Exception ignore) {
}
});
}
} }