feat: akka proxy actor

This commit is contained in:
tjq 2023-01-06 23:34:36 +08:00
parent d73b8e21e6
commit 5d3bfedf5d

View File

@ -0,0 +1,60 @@
package tech.powerjob.remote.akka;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import java.lang.reflect.Method;
import java.util.Optional;
/**
* 代理用的 actor
*
* @author tjq
* @since 2023/1/6
*/
@Slf4j
public class AkkaProxyActor extends AbstractActor {
private final Receive receive;
private final ActorInfo actorInfo;
public AkkaProxyActor(ActorInfo actorInfo) {
this.actorInfo = actorInfo;
final ReceiveBuilder receiveBuilder = receiveBuilder();
actorInfo.getHandlerInfos().forEach(handlerInfo -> {
final HandlerLocation location = handlerInfo.getLocation();
final Method handlerMethod = handlerInfo.getMethod();
final Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(handlerMethod.getParameterTypes());
if (!powerSerializeClz.isPresent()) {
throw new PowerJobException("build proxy for handler failed due to handler args is not PowerSerialize: " + location);
}
final Class<?> bindClz = powerSerializeClz.get();
receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo));
log.info("[PowerJobProxyActor] bind handler[{}] to [{}]", location, bindClz);
});
this.receive = receiveBuilder.build();
}
@Override
public Receive createReceive() {
return receive;
}
private <T> void onReceiveProcessorReportTaskStatusReq(T req, HandlerInfo handlerInfo) {
try {
final Object ret = handlerInfo.getMethod().invoke(actorInfo.getActor(), req);
if (ret != null) {
getSender().tell(ret, getSelf());
}
} catch (Exception e) {
log.error("[TaskTrackerProxyActor] process failed!", e);
}
}
}