feat: use PowerJobRemoteEngine to replace akka

This commit is contained in:
tjq 2023-01-20 14:19:09 +08:00
parent f0da89503e
commit 2020f72905
11 changed files with 56 additions and 25 deletions

View File

@ -14,12 +14,7 @@ public class RemoteConstant {
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
public static final String TASK_TRACKER_ACTOR_NAME = "task_tracker";
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
public static final String WORKER_ACTOR_NAME = "worker";
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
/* ************************ AKKA SERVER ************************ */

View File

@ -26,10 +26,6 @@ public class HandlerLocation implements Serializable {
* 方法路径
*/
private String methodPath;
/**
* 调用的集群类型用于兼容 AKKA 等除了IP还需要指定 system 访问的情况
*/
private ServerType serverType;
public String toPath() {
return String.format("/%s/%s", rootPath, methodPath);

View File

@ -14,6 +14,12 @@ import java.io.Serializable;
@Data
@Accessors(chain = true)
public class URL implements Serializable {
/**
* 调用的集群类型用于兼容 AKKA 等除了IP还需要指定 system 访问的情况
*/
private ServerType serverType;
/**
* remote address
*/

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.common.RemoteConstant;
import java.util.Map;
@ -21,7 +22,7 @@ public class AkkaMappingService {
private static final Map<String, ActorConfig> RP_2_ACTOR_CFG = Maps.newHashMap();
static {
// TODO: 迁移时写入规则
addMappingRule(RemoteConstant.SERVER_PATH, "server_actor", null);
}
private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher";
@ -46,4 +47,11 @@ public class AkkaMappingService {
private String actorName;
private String dispatcherName;
}
private static void addMappingRule(String newActorPath, String oldActorName, String dispatchName) {
ActorConfig actorConfig = new ActorConfig()
.setActorName(oldActorName)
.setDispatcherName(dispatchName == null ? DEFAULT_DISPATCH_NAME : dispatchName);
RP_2_ACTOR_CFG.put(newActorPath, actorConfig);
}
}

View File

@ -3,22 +3,17 @@ package tech.powerjob.remote.akka;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import com.google.common.collect.Maps;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
/**
* AkkaTransporter
@ -61,7 +56,7 @@ public class AkkaTransporter implements Transporter {
private ActorSelection fetchActorSelection(URL url) {
HandlerLocation location = url.getLocation();
String targetActorSystemName = AkkaConstant.fetchActorSystemName(location.getServerType());
String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());
String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();

View File

@ -12,6 +12,8 @@
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{20}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<logger name="MONITOR_LOGGER_DB_OPERATION" level="OFF"/>
<!-- Configuration for console output. -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>

View File

@ -20,7 +20,10 @@
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<powerjob-remote-framework.version>4.2.1</powerjob-remote-framework.version>
<powerjob-remote-impl-akka.version>4.2.1</powerjob-remote-impl-akka.version>
<powerjob-remote-impl-http.version>4.2.1</powerjob-remote-impl-http.version>
</properties>
<dependencies>
@ -32,13 +35,27 @@
<version>${spring.version}</version>
</dependency>
<!-- oms-common -->
<!-- PowerJob 通讯框架 -->
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-framework</artifactId>
<version>${powerjob-remote-framework.version}</version>
</dependency>
<!-- PowerJob 通讯层具体实现,按需选择自己需要的通讯器即可。如果包冲突可尝试更换通讯器实现。 -->
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>${powerjob-remote-impl-akka.version}</version>
</dependency>
<!-- 4.3.x 版本考虑到兼容性,官方默认实现仍为 akka如需使用 HTTP 请仔细引入依赖 -->
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-impl-http</artifactId>
<version>${powerjob-remote-impl-http.version}</version>
<scope>provided</scope>
</dependency>
<!-- h2 database -->
<dependency>
<groupId>com.h2database</groupId>

View File

@ -33,7 +33,6 @@ import tech.powerjob.worker.common.utils.SpringUtils;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -49,7 +48,7 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
private final WorkerRuntime workerRuntime = new WorkerRuntime();
private RemoteEngine remoteEngine;
private final RemoteEngine remoteEngine = new PowerJobRemoteEngine();
private final AtomicBoolean initialized = new AtomicBoolean();
@Override
@ -96,14 +95,18 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
workerRuntime.setExecutorManager(executorManager);
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了
// 初始化 actor
TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);
ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);
WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);
// 初始化通讯引擎
EngineConfig engineConfig = new EngineConfig()
.setType("")
.setType(config.getProtocol().name())
.setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
.setActorList(Lists.newArrayList());
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
remoteEngine = new PowerJobRemoteEngine();
EngineOutput engineOutput = remoteEngine.start(engineConfig);
workerRuntime.setTransporter(engineOutput.getTransporter());

View File

@ -5,6 +5,7 @@ import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.container.OmsContainerFactory;
import static tech.powerjob.common.RemoteConstant.*;
@ -19,9 +20,12 @@ import static tech.powerjob.common.RemoteConstant.*;
@Actor(path = WORKER_PATH)
public class WorkerActor {
private final WorkerRuntime workerRuntime;
private final TaskTrackerActor taskTrackerActor;
public WorkerActor(TaskTrackerActor taskTrackerActor) {
public WorkerActor(WorkerRuntime workerRuntime, TaskTrackerActor taskTrackerActor) {
this.workerRuntime = workerRuntime;
this.taskTrackerActor = taskTrackerActor;
}

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.worker.common.constants.StoreStrategy;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.WorkflowContext;
@ -35,6 +36,10 @@ public class PowerJobWorkerConfig {
* Do not mistake for ActorSystem port. Do not add any prefix, i.e. http://.
*/
private List<String> serverAddress = Lists.newArrayList();
/**
* Protocol for communication between WORKER and server
*/
private Protocol protocol = Protocol.AKKA;
/**
* Max length of response result. Result that is longer than the value will be truncated.
* {@link ProcessResult} max length for #msg

View File

@ -106,10 +106,10 @@ public class TransportUtils {
public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
HandlerLocation handlerLocation = new HandlerLocation()
.setServerType(serverType)
.setRootPath(rootPath)
.setMethodPath(handlerPath);
return new URL()
.setServerType(serverType)
.setAddress(Address.fromIpv4(address))
.setLocation(handlerLocation);
}