feat: chang to use PowerJobRemoteEngine to replace akka

This commit is contained in:
tjq 2023-01-20 13:18:58 +08:00
parent 43df09bb38
commit d46a6de26e
23 changed files with 115 additions and 440 deletions

View File

@ -36,7 +36,29 @@ public class RemoteConstant {
/* ************************ SERVER ************************ */
public static final String SERVER_PATH = "server";
/**
* server 处理在线日志
*/
public static final String SERVER_HANDLER_REPORT_LOG = "reportLog";
/**
* server 处理 worker 心跳
*/
public static final String SERVER_HANDLER_WORKER_HEARTBEAT = "workerHeartbeat";
/**
* server 处理 TaskTracker 上报的任务实例状态
*/
public static final String SERVER_HANDLER_REPORT_INSTANCE_STATUS = "reportInstanceStatus";
/**
* server 查询任务的可执行集群
*/
public static final String SERVER_HANDLER_QUERY_JOB_CLUSTER = "queryJobCluster";
/**
* server 处理 worker 请求部署容器命令
*/
public static final String SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER = "container";
/* ************************ Worker-TaskTracker ************************ */
public static final String WTT_PATH = "taskTracker";

View File

@ -1,22 +1,13 @@
package tech.powerjob.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;

View File

@ -1,20 +1,17 @@
package tech.powerjob.worker.actors;
import akka.actor.AbstractActor;
import akka.actor.Props;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.core.tracker.processor.ProcessorTracker;
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
import tech.powerjob.worker.core.tracker.processor.ProcessorTracker;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;

View File

@ -1,11 +1,7 @@
package tech.powerjob.worker.actors;
import akka.actor.AbstractActor;
import akka.actor.Props;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceDetail;

View File

@ -1,17 +1,16 @@
package tech.powerjob.worker.background;
import akka.actor.ActorSelection;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.common.request.WorkerHeartbeat;
import tech.powerjob.worker.common.PowerJobWorkerVersion;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.SystemInfoUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.container.OmsContainerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
@ -68,8 +67,7 @@ public class WorkerHealthReporter implements Runnable {
// 获取当前加载的容器列表
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
// 发送请求
String serverPath = AkkaUtils.getServerActorPath(currentServer);
if (StringUtils.isEmpty(serverPath)) {
if (StringUtils.isEmpty(currentServer)) {
return;
}
// log
@ -82,7 +80,7 @@ public class WorkerHealthReporter implements Runnable {
workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(),
heartbeat.getHeavyTaskTrackerNum()
);
ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath);
actorSelection.tell(heartbeat, null);
TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter());
}
}

View File

@ -1,64 +0,0 @@
package tech.powerjob.worker.common.utils;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.RemoteConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* AKKA 工具类
*
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class AkkaUtils {
/**
* akka://<actor system>@<hostname>:<port>/<actor path>
*/
private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
public static String getAkkaWorkerPath(String address, String actorName) {
return String.format(AKKA_NODE_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, actorName);
}
public static String getServerActorPath(String serverAddress) {
if (StringUtils.isEmpty(serverAddress)) {
return null;
}
return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, serverAddress, RemoteConstant.SERVER_ACTOR_NAME);
}
/**
* 可靠传输
* @param remote 远程 AKKA 节点
* @param msg 需要传输的对象
* @return true: 对方接收成功 / false: 对方接收失败可能传输成功但对方处理失败需要协同处理 AskResponse 返回值
*/
public static boolean reliableTransmit(ActorSelection remote, Object msg) {
try {
return easyAsk(remote, msg).isSuccess();
}catch (Exception e) {
log.warn("[PowerTransmitter] transmit {} failed", msg, e);
}
return false;
}
public static AskResponse easyAsk(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
return (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}catch (Exception e) {
throw new PowerJobException(e);
}
}
}

View File

@ -1,9 +1,11 @@
package tech.powerjob.worker.common.utils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.exception.PowerJobCheckedException;
import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation;
@ -11,9 +13,7 @@ import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import tech.powerjob.worker.pojo.request.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -29,6 +29,21 @@ import static tech.powerjob.common.RemoteConstant.*;
@Slf4j
public class TransportUtils {
public static void ttReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address);
transporter.tell(url, req);
}
public static void ttStartPtTask(TaskTrackerStartTaskReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_START_TASK, address);
transporter.tell(url, req);
}
public static void ttStopPtInstance(TaskTrackerStopInstanceReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_STOP_INSTANCE, address);
transporter.tell(url, req);
}
public static void ptReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address);
workerRuntime.getTransporter().tell(url, req);
@ -44,15 +59,14 @@ public class TransportUtils {
transporter.tell(url, req);
}
public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_HEARTBEAT, address);
transporter.tell(url, req);
}
public static boolean reliablePtReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) {
try {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address);
final CompletionStage<AskResponse> completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class);
return completionStage
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.isSuccess();
return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address, req, workerRuntime.getTransporter()).isSuccess();
} catch (Exception e) {
log.warn("[PowerJobTransport] reliablePtReportTask failed: {}", req, e);
return false;
@ -61,18 +75,35 @@ public class TransportUtils {
public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException {
try {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address);
final CompletionStage<AskResponse> completionStage = workerRuntime.getTransporter().ask(url, req, AskResponse.class);
return completionStage
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.isSuccess();
return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess();
} catch (Throwable throwable) {
throw new PowerJobCheckedException(throwable);
}
}
@SneakyThrows
public static boolean reliableTtReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) {
return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address, req, transporter).isSuccess();
}
@SneakyThrows
public static AskResponse reliableQueryJobCluster(WorkerQueryExecutorClusterReq req, String address, Transporter transporter) {
return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter);
}
@SneakyThrows
public static AskResponse reliableQueryContainerInfo(WorkerNeedDeployContainerRequest req, String address, Transporter transporter) {
return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, address, req, transporter);
}
private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {
final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address);
final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class);
return completionStage
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
HandlerLocation handlerLocation = new HandlerLocation()
.setServerType(serverType)

View File

@ -1,8 +1,5 @@
package tech.powerjob.worker.container;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.request.ServerDeployContainerRequest;
import tech.powerjob.common.request.WorkerNeedDeployContainerRequest;
@ -12,14 +9,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.TransportUtils;
import java.io.File;
import java.net.URL;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* 容器工厂
@ -39,16 +35,14 @@ public class OmsContainerFactory {
* @param serverActor 当容器不存在且 serverActor 非空时尝试从服务端重新拉取容器
* @return 容器示例可能为 null
*/
public static OmsContainer fetchContainer(Long containerId, ActorSelection serverActor) {
public static OmsContainer fetchContainer(Long containerId, WorkerRuntime workerRuntime) {
OmsContainer omsContainer = CARGO.get(containerId);
if (omsContainer != null) {
return omsContainer;
}
if (serverActor == null) {
return null;
}
final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
// 尝试从 server 加载
log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
@ -56,8 +50,7 @@ public class OmsContainerFactory {
try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
AskResponse askResponse = TransportUtils.reliableQueryContainerInfo(request, currentServerAddress, workerRuntime.getTransporter());
if (askResponse.isSuccess()) {
ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);

View File

@ -1,12 +1,10 @@
package tech.powerjob.worker.core.processor;
import akka.actor.ActorSelection;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.SpringUtils;
import tech.powerjob.worker.container.OmsContainer;
import tech.powerjob.worker.container.OmsContainerFactory;
@ -62,9 +60,7 @@ public class ProcessorLoader {
String[] split = processorInfo.split("#");
log.info("[ProcessorLoader] try to load processor({}) in container({})", split[1], split[0]);
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), actorSelection);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), workerRuntime);
if (omsContainer != null) {
processorInfoHolder = ProcessorInfo.of(omsContainer.getProcessor(split[1]), omsContainer.getContainerClassLoader());
} else {

View File

@ -5,7 +5,6 @@ import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.ThreadLocalStore;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.common.serialize.SerializerUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.common.utils.WorkflowContextUtils;

View File

@ -3,12 +3,10 @@ package tech.powerjob.worker.core.processor.sdk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.exception.PowerJobCheckedException;
import tech.powerjob.worker.common.ThreadLocalStore;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;

View File

@ -9,12 +9,9 @@ import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.ProcessorInfo;
import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable;
@ -22,7 +19,6 @@ import tech.powerjob.worker.core.processor.ProcessorLoader;
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.log.OmsLoggerFactory;
import tech.powerjob.worker.log.impl.OmsServerLogger;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;

View File

@ -1,7 +1,5 @@
package tech.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@ -9,16 +7,12 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -107,18 +101,15 @@ public abstract class TaskTracker {
response.setStartTime(System.currentTimeMillis());
response.setSourceAddress(workerRuntime.getWorkerAddress());
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
serverActor.tell(response, null);
TransportUtils.ttReportInstanceStatus(response, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
}
protected void reportFinalStatusThenDestroy(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) {
protected void reportFinalStatusThenDestroy(WorkerRuntime workerRuntime, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) {
String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
// 最终状态需要可靠上报
CompletionStage<Object> ask = Patterns.ask(serverActor, reportInstanceStatusReq, Duration.ofSeconds(15));
boolean serverAccepted = false;
try {
AskResponse askResponse = (AskResponse) ask.toCompletableFuture().get(15, TimeUnit.SECONDS);
serverAccepted = askResponse.isSuccess();
serverAccepted = TransportUtils.reliableTtReportInstanceStatus(reportInstanceStatusReq, currentServerAddress, workerRuntime.getTransporter());
} catch (Exception e) {
log.warn("[TaskTracker-{}] report finished status failed, req={}.", instanceId, reportInstanceStatusReq, e);
}

View File

@ -1,7 +1,5 @@
package tech.powerjob.worker.core.tracker.task.heavy;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.ToString;
@ -20,7 +18,7 @@ import tech.powerjob.common.response.AskResponse;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.persistence.TaskDO;
import java.time.Duration;
@ -229,22 +227,19 @@ public class CommonTaskTracker extends HeavyTaskTracker {
result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
}
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
// 4. 执行完毕报告服务器
if (finished.get()) {
req.setResult(result);
// 上报追加的工作流上下文信息
req.setAppendedWfContext(appendedWfContext);
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
reportFinalStatusThenDestroy(serverActor,req);
reportFinalStatusThenDestroy(workerRuntime, req);
return;
}
// 5. 未完成上报状态
req.setInstanceStatus(InstanceStatus.RUNNING.getV());
serverActor.tell(req, null);
TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
// 6.1 定期检查 -> 重试派发后未确认的任务
long currentMS = System.currentTimeMillis();

View File

@ -1,6 +1,5 @@
package tech.powerjob.worker.core.tracker.task.heavy;
import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
@ -22,8 +21,8 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.LRUCache;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.persistence.TaskDO;
import java.util.*;
@ -358,13 +357,8 @@ public class FrequentTaskTracker extends HeavyTaskTracker {
log.warn("[FQTaskTracker-{}] report alert req,time:{}", instanceId, req.getReportTime());
}
String serverPath = AkkaUtils.getServerActorPath(currentServerAddress);
if (StringUtils.isEmpty(serverPath)) {
return;
}
// 非可靠通知Server挂掉后任务的kill工作交由其他线程去做
ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
serverActor.tell(req, null);
TransportUtils.ttReportInstanceStatus(req, currentServerAddress, workerRuntime.getTransporter());
}
/**

View File

@ -1,15 +1,13 @@
package tech.powerjob.worker.core.tracker.task.heavy;
import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.request.WorkerQueryExecutorClusterReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.utils.CommonUtils;
@ -18,7 +16,7 @@ import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.common.utils.WorkflowContextUtils;
import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder;
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
@ -32,15 +30,10 @@ import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -365,11 +358,9 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 1. 通知 ProcessorTracker 释放资源
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = workerRuntime.getActorSystem().actorSelection(ptPath);
ptStatusHolder.getAllProcessorTrackers().forEach(ptAddress -> {
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null);
TransportUtils.ttStopPtInstance(stopRequest, ptAddress, workerRuntime.getTransporter());
});
// 2. 删除所有数据库数据
@ -425,9 +416,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 4. 任务派发
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = workerRuntime.getActorSystem().actorSelection(ptActorPath);
ptActor.tell(startTaskReq, null);
TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
}
@ -524,18 +513,20 @@ public abstract class HeavyTaskTracker extends TaskTracker {
return;
}
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
if (StringUtils.isEmpty(serverPath)) {
final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
if (StringUtils.isEmpty(currentServerAddress)) {
log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
return;
}
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId());
AskResponse response = AkkaUtils.easyAsk(workerRuntime.getActorSystem().actorSelection(serverPath), req);
if (!response.isSuccess()) {
log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
return;
}
try {
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId());
AskResponse response = TransportUtils.reliableQueryJobCluster(req, currentServerAddress, workerRuntime.getTransporter());
if (!response.isSuccess()) {
log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
return;
}
List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {});
ptStatusHolder.register(workerList);
} catch (Exception e) {

View File

@ -1,6 +1,5 @@
package tech.powerjob.worker.core.tracker.task.light;
import akka.actor.ActorSelection;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
@ -13,7 +12,7 @@ import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.*;
import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
import tech.powerjob.worker.core.tracker.task.TaskTracker;
@ -249,8 +248,6 @@ public class LightTaskTracker extends TaskTracker {
log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status);
return;
}
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq();
reportInstanceStatusReq.setAppId(workerRuntime.getAppId());
reportInstanceStatusReq.setJobId(instanceInfo.getJobId());
@ -303,13 +300,13 @@ public class LightTaskTracker extends TaskTracker {
reportInstanceStatusReq.setEndTime(taskEndTime);
// 微操一下上报最终状态时重新设置下时间并且增加一小段偏移保证在并发上报运行中状态以及最终状态时最终状态的上报时间晚于运行中的状态
reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1);
reportFinalStatusThenDestroy(serverActor, reportInstanceStatusReq);
reportFinalStatusThenDestroy(workerRuntime, reportInstanceStatusReq);
return;
}
// 未完成的任务只需要上报状态
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.RUNNING.getV());
log.info("[TaskTracker-{}] report status({}) success,real status is {}", instanceId, reportInstanceStatusReq, status);
serverActor.tell(reportInstanceStatusReq, null);
TransportUtils.ttReportInstanceStatus(reportInstanceStatusReq, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
}
private void timeoutCheck() {

View File

@ -1,11 +1,9 @@
package tech.powerjob.worker.processor.impl;
import akka.actor.ActorSelection;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.container.OmsContainer;
import tech.powerjob.worker.container.OmsContainerFactory;
import tech.powerjob.worker.extension.processor.ProcessorBean;
@ -45,9 +43,7 @@ public class JarContainerProcessorFactory implements ProcessorFactory {
log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), actorSelection);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);
if (omsContainer != null) {
return new ProcessorBean()
.setProcessor(omsContainer.getProcessor(className))

View File

@ -1,70 +0,0 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "WARNING"
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
allow-java-serialization = off
serializers {
power-serializer = "tech.powerjob.common.serialize.PowerAkkaSerializer"
}
serialization-bindings {
"tech.powerjob.common.PowerSerializable" = power-serializer
}
}
remote {
artery {
transport = tcp # See Selecting a transport below
# over write by code
canonical.hostname = "127.0.0.1"
canonical.port = 25520
}
}
# dispatcher
task-tracker-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
processor-tracker-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 64
}
throughput = 10
}
worker-common-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 8
}
throughput = 10
}
}

View File

@ -1,68 +0,0 @@
package tech.powerjob.worker.test;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.common.utils.NetUtils;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* 测试完整的 JobInstance 执行流程
*
* @author tjq
* @since 2020/3/25
*/
public class CommonTaskTrackerTest {
private static ActorSelection remoteTaskTracker;
@BeforeAll
public static void init() throws Exception {
PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig();
workerConfig.setAppName("oms-test");
workerConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
workerConfig.setEnableTestMode(true);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(workerConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
}
@Test
public void justStartWorkerToTestServer() throws Exception {
Thread.sleep(277277277);
}
@Test
public void testStandaloneJob() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.CRON), null);
Thread.sleep(5000000);
}
@Test
public void testMapReduceJob() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.CRON), null);
Thread.sleep(5000000);
}
@Test
public void testBroadcast() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.BROADCAST, TimeExpressionType.CRON), null);
Thread.sleep(5000000);
}
}

View File

@ -1,19 +1,10 @@
package tech.powerjob.worker.test;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
/**
* 启动公共服务
@ -23,30 +14,6 @@ import org.junit.jupiter.api.BeforeAll;
*/
public class CommonTest {
protected static ActorSelection remoteProcessorTracker;
protected static ActorSelection remoteTaskTracker;
@BeforeAll
public static void startWorker() throws Exception {
PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig();
workerConfig.setAppName("oms-test");
workerConfig.setEnableTestMode(true);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(workerConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String address = NetUtils.getLocalHost() + ":27777";
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.TASK_TRACKER_ACTOR_NAME));
}
@AfterAll
public static void stop() throws Exception {
Thread.sleep(120000);
}
public static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {

View File

@ -1,53 +0,0 @@
package tech.powerjob.worker.test;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.utils.AkkaUtils;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* description
*
* @author tjq
* @since 2020/4/9
*/
public class FrequentTaskTrackerTest {
private static ActorSelection remoteTaskTracker;
@BeforeAll
public static void init() throws Exception {
PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig();
workerConfig.setAppName("oms-test");
workerConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(workerConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
}
@Test
public void testFixRateJob() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.FIXED_RATE), null);
Thread.sleep(5000000);
}
@Test
public void testFixDelayJob() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.FIXED_DELAY), null);
Thread.sleep(5000000);
}
}

View File

@ -1,18 +0,0 @@
akka {
actor {
# for test
provider = remote
allow-java-serialization = off
serialization-bindings {
"OmsSerializable" = jackson-cbor
}
}
remote {
artery {
transport = tcp # See Selecting a transport below
canonical.hostname = "127.0.0.1"
canonical.port = 25521
}
}
}