feat: worker use PowerJobRemoteFramework

This commit is contained in:
tjq 2023-01-20 12:05:18 +08:00
parent 0400eceab1
commit 43df09bb38
16 changed files with 261 additions and 134 deletions

View File

@ -33,4 +33,47 @@ public class RemoteConstant {
/* ************************ OTHERS ************************ */ /* ************************ OTHERS ************************ */
public static final String EMPTY_ADDRESS = "N/A"; public static final String EMPTY_ADDRESS = "N/A";
public static final long DEFAULT_TIMEOUT_MS = 5000; public static final long DEFAULT_TIMEOUT_MS = 5000;
/* ************************ SERVER ************************ */
public static final String SERVER_PATH = "server";
public static final String SERVER_HANDLER_REPORT_LOG = "reportLog";
/* ************************ Worker-TaskTracker ************************ */
public static final String WTT_PATH = "taskTracker";
/**
* server 任务执行命令
*/
public static final String WTT_HANDLER_RUN_JOB = "runJob";
/**
* server 停止任务实例命令
*/
public static final String WTT_HANDLER_STOP_INSTANCE = "stopInstance";
/**
* sever 查询任务状态
*/
public static final String WTT_HANDLER_QUERY_INSTANCE_STATUS = "queryInstanceStatus";
/**
* PT 上报任务状态包含执行结果
*/
public static final String WTT_HANDLER_REPORT_TASK_STATUS = "reportTaskStatus";
/**
* PT 上报自身状态
*/
public static final String WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS = "reportProcessorTrackerStatus";
/**
* Map 任务
*/
public static final String WTT_HANDLER_MAP_TASK = "mapTask";
/* ************************ Worker-ProcessorTracker ************************ */
public static final String WPT_PATH = "processorTracker";
public static final String WPT_HANDLER_START_TASK = "startTask";
public static final String WPT_HANDLER_STOP_INSTANCE = "stopInstance";
} }

View File

@ -22,4 +22,19 @@ public class Address implements Serializable {
public String toFullAddress() { public String toFullAddress() {
return String.format("%s:%d", host, port); return String.format("%s:%d", host, port);
} }
public static Address fromIpv4(String ipv4) {
String[] split = ipv4.split(":");
return new Address()
.setHost(split[0])
.setPort(Integer.parseInt(split[1]));
}
@Override
public String toString() {
return "Address{" +
"host='" + host + '\'' +
", port=" + port +
'}';
}
} }

View File

@ -9,4 +9,6 @@ package tech.powerjob.remote.framework.engine;
public interface RemoteEngine { public interface RemoteEngine {
EngineOutput start(EngineConfig engineConfig); EngineOutput start(EngineConfig engineConfig);
void close();
} }

View File

@ -51,4 +51,9 @@ public class PowerJobRemoteEngine implements RemoteEngine {
return engineOutput; return engineOutput;
} }
@Override
public void close() {
}
} }

View File

@ -0,0 +1,24 @@
package tech.powerjob.remote.framework.base;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
/**
* test address
*
* @author tjq
* @since 2023/1/20
*/
@Slf4j
class AddressTest {
@Test
void testAddress() {
String ip = "192.168.1.1:10085";
final Address address = Address.fromIpv4(ip);
log.info("[AddressTest] parse address: {}", address);
assert ip.equals(address.toFullAddress());
}
}

View File

@ -15,12 +15,12 @@
<properties> <properties>
<spring.version>5.3.23</spring.version> <spring.version>5.3.23</spring.version>
<powerjob.common.version>4.2.1</powerjob.common.version>
<h2.db.version>2.1.214</h2.db.version> <h2.db.version>2.1.214</h2.db.version>
<hikaricp.version>4.0.3</hikaricp.version> <hikaricp.version>4.0.3</hikaricp.version>
<junit.version>5.9.1</junit.version> <junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version> <logback.version>1.2.9</logback.version>
<powerjob-remote-framework.version>4.2.1</powerjob-remote-framework.version>
</properties> </properties>
<dependencies> <dependencies>
@ -35,8 +35,8 @@
<!-- oms-common --> <!-- oms-common -->
<dependency> <dependency>
<groupId>tech.powerjob</groupId> <groupId>tech.powerjob</groupId>
<artifactId>powerjob-common</artifactId> <artifactId>powerjob-remote-framework</artifactId>
<version>${powerjob.common.version}</version> <version>${powerjob-remote-framework.version}</version>
</dependency> </dependency>
<!-- h2 database --> <!-- h2 database -->

View File

@ -6,6 +6,7 @@ import akka.actor.DeadLetter;
import akka.actor.Props; import akka.actor.Props;
import akka.routing.RoundRobinPool; import akka.routing.RoundRobinPool;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
@ -22,9 +23,14 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils; import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.worker.actors.ProcessorTrackerActor; import tech.powerjob.worker.actors.ProcessorTrackerActor;
import tech.powerjob.worker.actors.TaskTrackerActor; import tech.powerjob.worker.actors.TaskTrackerActor;
import tech.powerjob.worker.actors.TroubleshootingActor;
import tech.powerjob.worker.actors.WorkerActor; import tech.powerjob.worker.actors.WorkerActor;
import tech.powerjob.worker.background.OmsLogHandler; import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService; import tech.powerjob.worker.background.ServerDiscoveryService;
@ -52,6 +58,7 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
private final WorkerRuntime workerRuntime = new WorkerRuntime(); private final WorkerRuntime workerRuntime = new WorkerRuntime();
private RemoteEngine remoteEngine;
private final AtomicBoolean initialized = new AtomicBoolean(); private final AtomicBoolean initialized = new AtomicBoolean();
@Override @Override
@ -99,16 +106,15 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
workerRuntime.setExecutorManager(executorManager); workerRuntime.setExecutorManager(executorManager);
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了 // 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了
Map<String, Object> overrideConfig = Maps.newHashMap(); EngineConfig engineConfig = new EngineConfig()
overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost()); .setType("")
overrideConfig.put("akka.remote.artery.canonical.port", config.getPort()); .setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
.setActorList(Lists.newArrayList());
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); remoteEngine = new PowerJobRemoteEngine();
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); EngineOutput engineOutput = remoteEngine.start(engineConfig);
workerRuntime.setTransporter(engineOutput.getTransporter());
int cores = Runtime.getRuntime().availableProcessors();
ActorSystem actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
workerRuntime.setActorSystem(actorSystem);
// 连接 server // 连接 server
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig()); ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
@ -116,25 +122,10 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor()); serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
workerRuntime.setServerDiscoveryService(serverDiscoveryService); workerRuntime.setServerDiscoveryService(serverDiscoveryService);
ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(workerRuntime) log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
.withDispatcher("akka.task-tracker-dispatcher")
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.TASK_TRACKER_ACTOR_NAME);
actorSystem.actorOf(ProcessorTrackerActor.props(workerRuntime)
.withDispatcher("akka.processor-tracker-dispatcher")
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
actorSystem.actorOf(WorkerActor.props(taskTrackerActorRef)
.withDispatcher("akka.worker-common-dispatcher")
.withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME);
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[PowerJobWorker] akka-remote listening address: {}", workerAddress);
log.info("[PowerJobWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
// 初始化日志系统 // 初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService); OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
workerRuntime.setOmsLogHandler(omsLogHandler); workerRuntime.setOmsLogHandler(omsLogHandler);
// 初始化存储 // 初始化存储
@ -194,6 +185,6 @@ public class PowerJobWorker implements ApplicationContextAware, InitializingBean
@Override @Override
public void destroy() throws Exception { public void destroy() throws Exception {
workerRuntime.getExecutorManager().shutdown(); workerRuntime.getExecutorManager().shutdown();
workerRuntime.getActorSystem().terminate(); remoteEngine.close();
} }
} }

View File

@ -2,6 +2,10 @@ package tech.powerjob.worker.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.Props; import akka.actor.Props;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.core.tracker.processor.ProcessorTracker; import tech.powerjob.worker.core.tracker.processor.ProcessorTracker;
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager; import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
@ -21,29 +25,21 @@ import java.util.List;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @Actor(path = RemoteConstant.WPT_PATH)
public class ProcessorTrackerActor extends AbstractActor { public class ProcessorTrackerActor {
private final WorkerRuntime workerRuntime; private final WorkerRuntime workerRuntime;
public static Props props(WorkerRuntime workerRuntime) { public ProcessorTrackerActor(WorkerRuntime workerRuntime) {
return Props.create(ProcessorTrackerActor.class, () -> new ProcessorTrackerActor(workerRuntime)); this.workerRuntime = workerRuntime;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(TaskTrackerStartTaskReq.class, this::onReceiveTaskTrackerStartTaskReq)
.match(TaskTrackerStopInstanceReq.class, this::onReceiveTaskTrackerStopInstanceReq)
.matchAny(obj -> log.warn("[ProcessorTrackerActor] receive unknown request: {}.", obj))
.build();
} }
/** /**
* 处理来自TaskTracker的task执行请求 * 处理来自TaskTracker的task执行请求
* @param req 请求 * @param req 请求
*/ */
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { @Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)
public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
Long instanceId = req.getInstanceInfo().getInstanceId(); Long instanceId = req.getInstanceInfo().getInstanceId();
@ -68,7 +64,8 @@ public class ProcessorTrackerActor extends AbstractActor {
* 处理来自TaskTracker停止任务的请求 * 处理来自TaskTracker停止任务的请求
* @param req 请求 * @param req 请求
*/ */
private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { @Handler(path = RemoteConstant.WPT_HANDLER_STOP_INSTANCE)
public void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) {
Long instanceId = req.getInstanceId(); Long instanceId = req.getInstanceId();
List<ProcessorTracker> removedPts = ProcessorTrackerManager.removeProcessorTracker(instanceId); List<ProcessorTracker> removedPts = ProcessorTrackerManager.removeProcessorTracker(instanceId);

View File

@ -5,6 +5,7 @@ import akka.actor.Props;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.model.InstanceDetail;
@ -12,6 +13,8 @@ import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
@ -26,6 +29,8 @@ import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import java.util.List; import java.util.List;
import static tech.powerjob.common.RemoteConstant.*;
/** /**
* worker master 节点处理来自 server jobInstance 请求和来自 worker 的task 请求 * worker master 节点处理来自 server jobInstance 请求和来自 worker 的task 请求
* *
@ -33,47 +38,33 @@ import java.util.List;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @Actor(path = WTT_PATH)
public class TaskTrackerActor extends AbstractActor { public class TaskTrackerActor {
private final WorkerRuntime workerRuntime; private final WorkerRuntime workerRuntime;
public static Props props(WorkerRuntime workerRuntime) { public TaskTrackerActor(WorkerRuntime workerRuntime) {
return Props.create(TaskTrackerActor.class, () -> new TaskTrackerActor(workerRuntime)); this.workerRuntime = workerRuntime;
} }
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ProcessorReportTaskStatusReq.class, this::onReceiveProcessorReportTaskStatusReq)
.match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq)
.match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest)
.match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq)
.match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq)
.match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq)
.matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj))
.build();
}
/** /**
* 子任务状态上报 处理器 * 子任务状态上报 处理器
*/ */
private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { @Handler(path = WTT_HANDLER_REPORT_TASK_STATUS)
public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {
int taskStatus = req.getStatus(); int taskStatus = req.getStatus();
// 只有重量级任务才会有两级任务状态上报的机制 // 只有重量级任务才会有两级任务状态上报的机制
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
// 结束状态需要回复接受成功 // 结束状态需要回复接受成功
if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) { if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) {
AskResponse askResponse = AskResponse.succeed(null); return AskResponse.succeed(null);
getSender().tell(askResponse, getSelf());
} }
// 手动停止 TaskTracker 的情况下会出现这种情况 // 手动停止 TaskTracker 的情况下会出现这种情况
if (taskTracker == null) { if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
return; return null;
} }
if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) { if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
@ -84,17 +75,20 @@ public class TaskTrackerActor extends AbstractActor {
// 更新工作流上下文 // 更新工作流上下文
taskTracker.updateAppendedWfContext(req.getAppendedWfContext()); taskTracker.updateAppendedWfContext(req.getAppendedWfContext());
return null;
} }
/** /**
* 子任务 map 处理器 * 子任务 map 处理器
*/ */
private void onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) { @Handler(path = WTT_HANDLER_MAP_TASK)
public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) {
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
if (taskTracker == null) { if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req); log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req);
return; return null;
} }
boolean success = false; boolean success = false;
@ -121,13 +115,14 @@ public class TaskTrackerActor extends AbstractActor {
AskResponse response = new AskResponse(); AskResponse response = new AskResponse();
response.setSuccess(success); response.setSuccess(success);
getSender().tell(response, getSelf()); return response;
} }
/** /**
* 服务器任务调度处理器 * 服务器任务调度处理器
*/ */
private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { @Handler(path = WTT_HANDLER_RUN_JOB)
public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
log.debug("[TaskTrackerActor] server schedule job by request: {}.", req); log.debug("[TaskTrackerActor] server schedule job by request: {}.", req);
Long instanceId = req.getInstanceId(); Long instanceId = req.getInstanceId();
// 区分轻量级任务模型以及重量级任务模型 // 区分轻量级任务模型以及重量级任务模型
@ -168,7 +163,8 @@ public class TaskTrackerActor extends AbstractActor {
/** /**
* ProcessorTracker 心跳处理器 * ProcessorTracker 心跳处理器
*/ */
private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) { @Handler(path = WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS)
public void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) {
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
if (taskTracker == null) { if (taskTracker == null) {
@ -181,8 +177,8 @@ public class TaskTrackerActor extends AbstractActor {
/** /**
* 停止任务实例 * 停止任务实例
*/ */
private void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) { @Handler(path = WTT_HANDLER_STOP_INSTANCE)
public void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) {
log.info("[TaskTrackerActor] receive ServerStopInstanceReq({}).", req); log.info("[TaskTrackerActor] receive ServerStopInstanceReq({}).", req);
HeavyTaskTracker heavyTaskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); HeavyTaskTracker heavyTaskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
@ -201,7 +197,8 @@ public class TaskTrackerActor extends AbstractActor {
/** /**
* 查询任务实例运行状态 * 查询任务实例运行状态
*/ */
private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { @Handler(path = WTT_HANDLER_QUERY_INSTANCE_STATUS)
public AskResponse onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) {
AskResponse askResponse; AskResponse askResponse;
TaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); TaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
if (taskTracker == null && (taskTracker = LightTaskTrackerManager.getTaskTracker(req.getInstanceId())) == null) { if (taskTracker == null && (taskTracker = LightTaskTrackerManager.getTaskTracker(req.getInstanceId())) == null) {
@ -211,7 +208,7 @@ public class TaskTrackerActor extends AbstractActor {
InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(); InstanceDetail instanceDetail = taskTracker.fetchRunningStatus();
askResponse = AskResponse.succeed(instanceDetail); askResponse = AskResponse.succeed(instanceDetail);
} }
getSender().tell(askResponse, getSelf()); return askResponse;
} }

View File

@ -1,25 +0,0 @@
package tech.powerjob.worker.actors;
import akka.actor.AbstractActor;
import akka.actor.DeadLetter;
import lombok.extern.slf4j.Slf4j;
/**
* 处理系统异常的 Actor
*
* @author 朱八
* @since 2020/7/16
*/
@Slf4j
public class TroubleshootingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeadLetter.class, this::onReceiveDeadLetter)
.build();
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl);
}
}

View File

@ -1,15 +1,14 @@
package tech.powerjob.worker.background; package tech.powerjob.worker.background;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import tech.powerjob.common.enums.LogLevel; import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.InstanceLogContent; import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.request.WorkerLogReportReq; import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.remote.framework.transporter.Transporter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Queues; import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -27,7 +26,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class OmsLogHandler { public class OmsLogHandler {
private final String workerAddress; private final String workerAddress;
private final ActorSystem actorSystem; private final Transporter transporter;
private final ServerDiscoveryService serverDiscoveryService; private final ServerDiscoveryService serverDiscoveryService;
// 处理线程需要通过线程池启动 // 处理线程需要通过线程池启动
@ -42,9 +41,9 @@ public class OmsLogHandler {
// 本地囤积阈值 // 本地囤积阈值
private static final int REPORT_SIZE = 1024; private static final int REPORT_SIZE = 1024;
public OmsLogHandler(String workerAddress, ActorSystem actorSystem, ServerDiscoveryService serverDiscoveryService) { public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
this.workerAddress = workerAddress; this.workerAddress = workerAddress;
this.actorSystem = actorSystem; this.transporter = transporter;
this.serverDiscoveryService = serverDiscoveryService; this.serverDiscoveryService = serverDiscoveryService;
} }
@ -81,9 +80,9 @@ public class OmsLogHandler {
try { try {
String serverPath = AkkaUtils.getServerActorPath(serverDiscoveryService.getCurrentServerAddress()); final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
// 当前无可用 Server // 当前无可用 Server
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(currentServerAddress)) {
if (!logQueue.isEmpty()) { if (!logQueue.isEmpty()) {
logQueue.clear(); logQueue.clear();
log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs."); log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
@ -91,7 +90,6 @@ public class OmsLogHandler {
return; return;
} }
ActorSelection serverActor = actorSystem.actorSelection(serverPath);
List<InstanceLogContent> logs = Lists.newLinkedList(); List<InstanceLogContent> logs = Lists.newLinkedList();
while (!logQueue.isEmpty()) { while (!logQueue.isEmpty()) {
@ -102,7 +100,7 @@ public class OmsLogHandler {
if (logs.size() >= BATCH_SIZE) { if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs)); WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
// 不可靠请求WEB日志不追求极致 // 不可靠请求WEB日志不追求极致
serverActor.tell(req, null); TransportUtils.reportLogs(req, currentServerAddress, transporter);
logs.clear(); logs.clear();
} }
@ -113,7 +111,7 @@ public class OmsLogHandler {
if (!logs.isEmpty()) { if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs); WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
serverActor.tell(req, null); TransportUtils.reportLogs(req, currentServerAddress, transporter);
} }
}finally { }finally {

View File

@ -1,6 +1,6 @@
package tech.powerjob.worker.common; package tech.powerjob.worker.common;
import akka.actor.ActorSystem; import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.worker.background.OmsLogHandler; import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService; import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.WorkerHealthReporter; import tech.powerjob.worker.background.WorkerHealthReporter;
@ -23,7 +23,7 @@ public class WorkerRuntime {
private PowerJobWorkerConfig workerConfig; private PowerJobWorkerConfig workerConfig;
private ActorSystem actorSystem; private Transporter transporter;
private WorkerHealthReporter healthReporter; private WorkerHealthReporter healthReporter;

View File

@ -0,0 +1,86 @@
package tech.powerjob.worker.common.utils;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.exception.PowerJobCheckedException;
import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static tech.powerjob.common.RemoteConstant.*;
/**
* 通讯工具
*
* @author tjq
* @since 2023/1/20
*/
@Slf4j
public class TransportUtils {
public static void ptReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address);
workerRuntime.getTransporter().tell(url, req);
}
public static void ptReportSelfStatus(ProcessorTrackerStatusReportReq req, String address, WorkerRuntime workerRuntime) {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS, address);
workerRuntime.getTransporter().tell(url, req);
}
public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_LOG, address);
transporter.tell(url, req);
}
public static boolean reliablePtReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) {
try {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address);
final CompletionStage<AskResponse> completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class);
return completionStage
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.isSuccess();
} catch (Exception e) {
log.warn("[PowerJobTransport] reliablePtReportTask failed: {}", req, e);
return false;
}
}
public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException {
try {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address);
final CompletionStage<AskResponse> completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class);
return completionStage
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.isSuccess();
} catch (Throwable throwable) {
throw new PowerJobCheckedException(throwable);
}
}
public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
HandlerLocation handlerLocation = new HandlerLocation()
.setServerType(serverType)
.setRootPath(rootPath)
.setMethodPath(handlerPath);
return new URL()
.setAddress(Address.fromIpv4(address))
.setLocation(handlerLocation);
}
}

View File

@ -1,6 +1,5 @@
package tech.powerjob.worker.core.processor.runnable; package tech.powerjob.worker.core.processor.runnable;
import akka.actor.ActorSelection;
import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.ThreadLocalStore; import tech.powerjob.worker.common.ThreadLocalStore;
@ -8,6 +7,7 @@ import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.common.serialize.SerializerUtils; import tech.powerjob.common.serialize.SerializerUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.common.utils.WorkflowContextUtils; import tech.powerjob.worker.common.utils.WorkflowContextUtils;
import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskContext;
@ -45,7 +45,7 @@ public class HeavyProcessorRunnable implements Runnable {
private final InstanceInfo instanceInfo; private final InstanceInfo instanceInfo;
private final ActorSelection taskTrackerActor; private final String taskTrackerAddress;
private final TaskDO task; private final TaskDO task;
private final BasicProcessor processor; private final BasicProcessor processor;
private final OmsLogger omsLogger; private final OmsLogger omsLogger;
@ -222,14 +222,14 @@ public class HeavyProcessorRunnable implements Runnable {
// 最终结束状态要求可靠发送 // 最终结束状态要求可靠发送
if (TaskStatus.FINISHED_STATUS.contains(status.getValue())) { if (TaskStatus.FINISHED_STATUS.contains(status.getValue())) {
boolean success = AkkaUtils.reliableTransmit(taskTrackerActor, req); boolean success = TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime);
if (!success) { if (!success) {
// 插入重试队列等待重试 // 插入重试队列等待重试
statusReportRetryQueue.add(req); statusReportRetryQueue.add(req);
log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", task.getInstanceId(), task.getTaskId(), status, result); log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", task.getInstanceId(), task.getTaskId(), status, result);
} }
} else { } else {
taskTrackerActor.tell(req, null); TransportUtils.ptReportTask(req, taskTrackerAddress, workerRuntime);
} }
} }

View File

@ -9,6 +9,7 @@ import tech.powerjob.worker.common.ThreadLocalStore;
import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
@ -55,8 +56,7 @@ public interface MapProcessor extends BasicProcessor {
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常 // 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.TASK_TRACKER_ACTOR_NAME); boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime);
boolean requestSucceed = AkkaUtils.reliableTransmit(workerRuntime.getActorSystem().actorSelection(akkaRemotePath), req);
if (requestSucceed) { if (requestSucceed) {
log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size()); log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size());

View File

@ -1,6 +1,5 @@
package tech.powerjob.worker.core.tracker.processor; package tech.powerjob.worker.core.tracker.processor;
import akka.actor.ActorSelection;
import com.google.common.collect.Queues; import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -16,6 +15,7 @@ import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils; import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.ProcessorInfo; import tech.powerjob.worker.core.processor.ProcessorInfo;
import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable; import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable;
import tech.powerjob.worker.core.processor.ProcessorLoader; import tech.powerjob.worker.core.processor.ProcessorLoader;
@ -76,8 +76,6 @@ public class ProcessorTracker {
private String taskTrackerAddress; private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
private ScheduledExecutorService timingPool; private ScheduledExecutorService timingPool;
@ -107,9 +105,6 @@ public class ProcessorTracker {
this.instanceId = request.getInstanceInfo().getInstanceId(); this.instanceId = request.getInstanceInfo().getInstanceId();
this.taskTrackerAddress = request.getTaskTrackerAddress(); this.taskTrackerAddress = request.getTaskTrackerAddress();
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime); this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L; this.lastIdleTime = -1L;
@ -152,7 +147,7 @@ public class ProcessorTracker {
.setResult(lethalReason) .setResult(lethalReason)
.setReportTime(System.currentTimeMillis()); .setReportTime(System.currentTimeMillis());
taskTrackerActorRef.tell(report, null); TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);
return; return;
} }
@ -162,7 +157,7 @@ public class ProcessorTracker {
newTask.setAddress(taskTrackerAddress); newTask.setAddress(taskTrackerAddress);
ClassLoader classLoader = processorInfo.getClassLoader(); ClassLoader classLoader = processorInfo.getClassLoader();
HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processorInfo.getBasicProcessor(), omsLogger, classLoader, statusReportRetryQueue, workerRuntime); HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorInfo.getBasicProcessor(), omsLogger, classLoader, statusReportRetryQueue, workerRuntime);
try { try {
threadPool.submit(heavyProcessorRunnable); threadPool.submit(heavyProcessorRunnable);
success = true; success = true;
@ -182,7 +177,7 @@ public class ProcessorTracker {
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
reportReq.setReportTime(System.currentTimeMillis()); reportReq.setReportTime(System.currentTimeMillis());
taskTrackerActorRef.tell(reportReq, null); TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);
log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
@ -203,7 +198,6 @@ public class ProcessorTracker {
}); });
// 2. 去除顶层引用送入GC世界 // 2. 去除顶层引用送入GC世界
taskTrackerActorRef = null;
statusReportRetryQueue.clear(); statusReportRetryQueue.clear();
ProcessorTrackerManager.removeProcessorTracker(instanceId); ProcessorTrackerManager.removeProcessorTracker(instanceId);
@ -281,7 +275,7 @@ public class ProcessorTracker {
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受 // 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId); ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
statusReportReq.setAddress(workerRuntime.getWorkerAddress()); statusReportReq.setAddress(workerRuntime.getWorkerAddress());
taskTrackerActorRef.tell(statusReportReq, null); TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
destroy(); destroy();
return; return;
} }
@ -293,7 +287,7 @@ public class ProcessorTracker {
ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll(); ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
if (req != null) { if (req != null) {
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) { if (!TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime)) {
statusReportRetryQueue.add(req); statusReportRetryQueue.add(req);
log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", instanceId, req); log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", instanceId, req);
return; return;
@ -305,7 +299,7 @@ public class ProcessorTracker {
long waitingNum = threadPool.getQueue().size(); long waitingNum = threadPool.getQueue().size();
ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum); ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum);
statusReportReq.setAddress(workerRuntime.getWorkerAddress()); statusReportReq.setAddress(workerRuntime.getWorkerAddress());
taskTrackerActorRef.tell(statusReportReq, null); TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum); log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
} }