feat: replace akka by PowerJobRemoteEngine in server side

This commit is contained in:
tjq 2023-01-21 10:28:11 +08:00
parent 5a14b300f9
commit b013fbfefd
33 changed files with 530 additions and 610 deletions

View File

@ -29,31 +29,43 @@ public class RemoteConstant {
public static final String EMPTY_ADDRESS = "N/A";
public static final long DEFAULT_TIMEOUT_MS = 5000;
/* ************************ SERVER ************************ */
public static final String SERVER_PATH = "server";
/* ************************ SERVER-self_side (s4s == server for server side) ************************ */
public static final String S4S_PATH = "friend";
/**
* server 集群间的心跳处理
*/
public static final String S4S_HANDLER_PING = "ping";
/**
* 处理其他 server 的执行请求
*/
public static final String S4S_HANDLER_PROCESS = "process";
/* ************************ SERVER-worker_sides4w == server for worker side ************************ */
public static final String S4W_PATH = "server";
/**
* server 处理在线日志
*/
public static final String SERVER_HANDLER_REPORT_LOG = "reportLog";
public static final String S4W_HANDLER_REPORT_LOG = "reportLog";
/**
* server 处理 worker 心跳
*/
public static final String SERVER_HANDLER_WORKER_HEARTBEAT = "workerHeartbeat";
public static final String S4W_HANDLER_WORKER_HEARTBEAT = "workerHeartbeat";
/**
* server 处理 TaskTracker 上报的任务实例状态
*/
public static final String SERVER_HANDLER_REPORT_INSTANCE_STATUS = "reportInstanceStatus";
public static final String S4W_HANDLER_REPORT_INSTANCE_STATUS = "reportInstanceStatus";
/**
* server 查询任务的可执行集群
*/
public static final String SERVER_HANDLER_QUERY_JOB_CLUSTER = "queryJobCluster";
public static final String S4W_HANDLER_QUERY_JOB_CLUSTER = "queryJobCluster";
/**
* server 处理 worker 请求部署容器命令
*/
public static final String SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER = "container";
public static final String S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER = "queryContainer";
/* ************************ Worker-TaskTracker ************************ */
public static final String WTT_PATH = "taskTracker";

View File

@ -22,7 +22,8 @@ public class AkkaMappingService {
private static final Map<String, ActorConfig> RP_2_ACTOR_CFG = Maps.newHashMap();
static {
addMappingRule(RemoteConstant.SERVER_PATH, "server_actor", null);
addMappingRule(RemoteConstant.S4W_PATH, "server_actor", "w-r-c-d");
addMappingRule(RemoteConstant.S4S_PATH, "friend_actor", "friend-request-actor-dispatcher");
}
private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher";

View File

@ -55,9 +55,15 @@ public class AkkaProxyActor extends AbstractActor {
try {
final Object ret = handlerInfo.getMethod().invoke(actorInfo.getActor(), req);
if (ret != null) {
getSender().tell(ret, getSelf());
if (ret == null) {
return;
}
if (ret instanceof Optional) {
if (!((Optional<?>) ret).isPresent()) {
return;
}
}
getSender().tell(ret, getSelf());
} catch (Exception e) {
log.error("[PowerJob-AKKA] process failed!", e);
}

View File

@ -28,7 +28,7 @@
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.7.4</springboot.version>
<powerjob.common.version>4.2.1</powerjob.common.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.30</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
@ -50,6 +50,8 @@
<groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.1.6</cron-utils.version>
<powerjob-remote-impl-http.version>4.2.1</powerjob-remote-impl-http.version>
<powerjob-remote-impl-akka.version>4.2.1</powerjob-remote-impl-akka.version>
</properties>
<dependencyManagement>
@ -99,11 +101,16 @@
<dependencies>
<!-- oms-common -->
<!-- 网络层 -->
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-common</artifactId>
<version>${powerjob.common.version}</version>
<artifactId>powerjob-remote-impl-http</artifactId>
<version>${powerjob-remote-impl-http.version}</version>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-impl-akka</artifactId>
<version>${powerjob-remote-impl-akka.version}</version>
</dependency>
<!-- mysql -->
@ -267,6 +274,12 @@
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- swagger2 ui -->
<dependency>

View File

@ -11,6 +11,7 @@ import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.*;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.core.instance.InstanceManager;
@ -19,7 +20,8 @@ import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.tp.ServerURLFactory;
import tech.powerjob.server.remote.tp.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.ArrayList;
@ -165,7 +167,8 @@ public class DispatchService {
WorkerInfo taskTracker = suitableWorkers.get(0);
String taskTrackerAddress = taskTracker.getAddress();
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
URL workerUrl = ServerURLFactory.dispatchJob2Worker(taskTrackerAddress);
transportService.tell(taskTracker.getProtocol(), workerUrl, req);
log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);
// 修改状态

View File

@ -1,25 +1,5 @@
package tech.powerjob.server.core.container;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.model.GitRepoInfo;
import tech.powerjob.common.request.ServerDeployContainerRequest;
import tech.powerjob.common.request.ServerDestroyContainerRequest;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.ContainerSourceType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.common.module.WorkerInfo;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@ -28,6 +8,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.maven.shared.invoker.DefaultInvocationRequest;
@ -43,8 +24,29 @@ import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.model.GitRepoInfo;
import tech.powerjob.common.request.ServerDeployContainerRequest;
import tech.powerjob.common.request.ServerDestroyContainerRequest;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.constants.ContainerSourceType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.tp.ServerURLFactory;
import tech.powerjob.server.remote.tp.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.Resource;
import javax.websocket.RemoteEndpoint;
@ -128,7 +130,8 @@ public class ContainerService {
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
workerClusterQueryService.getAllAliveWorkers(container.getAppId()).forEach(workerInfo -> {
transportService.tell(Protocol.AKKA, workerInfo.getAddress(), destroyRequest);
final URL url = ServerURLFactory.destroyContainer2Worker(workerInfo.getAddress());
transportService.tell(workerInfo.getProtocol(), url, destroyRequest);
});
log.info("[ContainerService] delete container: {}.", container);
@ -247,11 +250,8 @@ public class ContainerService {
containerInfoRepository.saveAndFlush(container);
// 开始部署需要分批进行
Set<String> workerAddressList = workerClusterQueryService.getAllAliveWorkers(container.getAppId())
.stream()
.map(WorkerInfo::getAddress)
.collect(Collectors.toSet());
if (workerAddressList.isEmpty()) {
final List<WorkerInfo> allAliveWorkers = workerClusterQueryService.getAllAliveWorkers(container.getAppId());
if (allAliveWorkers.isEmpty()) {
remote.sendText("SYSTEM: there is no worker available now, deploy failed!");
return;
}
@ -262,10 +262,12 @@ public class ContainerService {
long sleepTime = calculateSleepTime(jarFile.length());
AtomicInteger count = new AtomicInteger();
workerAddressList.forEach(akkaAddress -> {
transportService.tell(Protocol.AKKA, akkaAddress, req);
allAliveWorkers.forEach(workerInfo -> {
remote.sendText("SYSTEM: send deploy request to " + akkaAddress);
final URL url = ServerURLFactory.deployContainer2Worker(workerInfo.getAddress());
transportService.tell(workerInfo.getProtocol(), url, req);
remote.sendText("SYSTEM: send deploy request to " + url.getAddress());
if (count.incrementAndGet() % DEPLOY_BATCH_NUM == 0) {
CommonUtils.executeIgnoreException(() -> Thread.sleep(sleepTime));
@ -285,6 +287,7 @@ public class ContainerService {
* @param containerId 容器ID
* @return 拼接好的可阅读字符串
*/
@DesignateServer
public String fetchDeployedInfo(Long appId, Long containerId) {
List<DeployedContainerInfo> infoList = workerClusterQueryService.getDeployedContainerInfos(appId, containerId);

View File

@ -2,21 +2,27 @@ package tech.powerjob.server.core.handler;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.remote.actoes.ServerActor;
import java.util.Optional;
import static tech.powerjob.common.RemoteConstant.*;
/**
* 定义 server worker 之间需要处理的协议
*
* @author tjq
* @since 2022/9/10
*/
public interface IWorkerRequestHandler {
public interface IWorkerRequestHandler extends ServerActor {
/**
* 处理 worker 上报的心跳信息
* @param heartbeat 心跳信息
*/
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
void processWorkerHeartbeat(WorkerHeartbeat heartbeat);
/**
@ -24,6 +30,7 @@ public interface IWorkerRequestHandler {
* @param req 上报请求
* @return 响应信息
*/
@Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING)
Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
/**
@ -31,12 +38,14 @@ public interface IWorkerRequestHandler {
* @param req 请求
* @return cluster info
*/
@Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING)
AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req);
/**
* 处理 worker 日志推送请求
* 处理 worker 日志推送请求内部使用线程池异步处理非阻塞
* @param req 请求
*/
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
void processWorkerLogReport(WorkerLogReportReq req);
/**
@ -44,5 +53,6 @@ public interface IWorkerRequestHandler {
* @param request 请求
* @return 容器部署信息
*/
@Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING)
AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request);
}

View File

@ -4,11 +4,13 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.request.WorkerHeartbeat;
import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
@ -30,6 +32,7 @@ import java.util.Optional;
*/
@Slf4j
@Component
@Actor(path = RemoteConstant.S4W_PATH)
public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
private final InstanceManager instanceManager;

View File

@ -1,28 +0,0 @@
package tech.powerjob.server.core.handler.impl;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import javax.annotation.PostConstruct;
/**
* 初始化器
*
* @author tjq
* @since 2022/9/11
*/
@Component
@ConditionalOnExpression("'${execution.env}'!='test'")
public class Initializer {
@PostConstruct
public void initHandler() {
// init akka
AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
// init vert.x
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
}
}

View File

@ -2,15 +2,16 @@ package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder;
import tech.powerjob.server.common.utils.SpringUtils;
@ -22,10 +23,10 @@ import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.tp.ServerURLFactory;
import tech.powerjob.server.remote.tp.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
@ -176,7 +177,8 @@ public class InstanceManager {
if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq);
final URL url = ServerURLFactory.stopInstance2Worker(workerInfo.getAddress());
transportService.tell(workerInfo.getProtocol(), url, stopInstanceReq);
}
}

View File

@ -5,15 +5,16 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.response.InstanceInfoDTO;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.constants.InstanceType;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.timewheel.TimerFuture;
@ -26,12 +27,14 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.tp.ServerURLFactory;
import tech.powerjob.server.remote.tp.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static tech.powerjob.common.enums.InstanceStatus.RUNNING;
@ -136,7 +139,7 @@ public class InstanceService {
if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req);
transportService.tell(workerInfo.getProtocol(), ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()), req);
log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId);
} else {
log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId);
@ -280,7 +283,10 @@ public class InstanceService {
WorkerInfo workerInfo = workerInfoOpt.get();
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId);
try {
AskResponse askResponse = transportService.ask(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req);
final URL url = ServerURLFactory.queryInstance2Worker(workerInfo.getAddress());
AskResponse askResponse = transportService.ask(workerInfo.getProtocol(), url, req, AskResponse.class)
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (askResponse.isSuccess()) {
InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class);
instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes());

View File

@ -22,9 +22,8 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo;
import tech.powerjob.server.persistence.remote.repository.*;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.tp.TransportService;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
@ -49,6 +48,8 @@ public class InstanceStatusCheckService {
public static final long CHECK_INTERVAL = 10000;
private final TransportService transportService;
private final DispatchService dispatchService;
private final InstanceManager instanceManager;
@ -61,7 +62,6 @@ public class InstanceStatusCheckService {
private final InstanceInfoRepository instanceInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@ -69,7 +69,7 @@ public class InstanceStatusCheckService {
public void checkWorkflowInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
@ -89,7 +89,7 @@ public class InstanceStatusCheckService {
public void checkWaitingDispatchInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
@ -110,7 +110,7 @@ public class InstanceStatusCheckService {
public void checkWaitingWorkerReceiveInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
@ -131,7 +131,7 @@ public class InstanceStatusCheckService {
public void checkRunningInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;

View File

@ -23,10 +23,9 @@ import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.tp.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import javax.annotation.Resource;
import java.util.*;
/**
@ -47,6 +46,7 @@ public class PowerScheduleService {
*/
private static final int MAX_APP_NUM = 10;
private final TransportService transportService;
private final DispatchService dispatchService;
private final InstanceService instanceService;
@ -72,7 +72,7 @@ public class PowerScheduleService {
long start = System.currentTimeMillis();
// 调度 CRON 表达式 JOB
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[CronJobSchedule] current server has no app's job to schedule.");
return;
@ -92,7 +92,7 @@ public class PowerScheduleService {
long start = System.currentTimeMillis();
// 调度 CRON 表达式 WORKFLOW
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule.");
return;
@ -113,7 +113,7 @@ public class PowerScheduleService {
long start = System.currentTimeMillis();
// 调度 FIX_RATE/FIX_DELAY 表达式 JOB
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[FrequentJobSchedule] current server has no app's job to schedule.");
return;
@ -132,7 +132,7 @@ public class PowerScheduleService {
public void cleanData() {
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
if (allAppIds.isEmpty()) {
return;
}

View File

@ -0,0 +1,10 @@
package tech.powerjob.server.remote.actoes;
/**
* ServerActor 声明接口
*
* @author tjq
* @since 2023/1/21
*/
public interface ServerActor {
}

View File

@ -0,0 +1,52 @@
package tech.powerjob.server.remote.server;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.remote.actoes.ServerActor;
import tech.powerjob.server.remote.server.election.Ping;
import tech.powerjob.server.remote.server.redirector.RemoteProcessReq;
import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor;
import static tech.powerjob.common.RemoteConstant.*;
/**
* 处理朋友们的信息处理服务器与服务器之间的通讯
*
* @author tjq
* @since 2020/4/9
*/
@Slf4j
@Component
@Handler(path = S4S_PATH)
public class FriendActor implements ServerActor {
private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA==";
/**
* 处理存活检测的请求
*/
@Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING)
public AskResponse onReceivePing(Ping ping) {
return AskResponse.succeed(SK);
}
@Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING)
private AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) {
AskResponse response = new AskResponse();
response.setSuccess(true);
try {
response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req)));
} catch (Throwable t) {
log.error("[FriendActor] process remote request[{}] failed!", req, t);
response.setSuccess(false);
response.setMessage(ExceptionUtils.getMessage(t));
}
return response;
}
}

View File

@ -1,86 +0,0 @@
package tech.powerjob.server.remote.server;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.routing.DefaultResizer;
import akka.routing.RoundRobinPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.remote.server.election.Ping;
import tech.powerjob.server.remote.server.redirector.RemoteProcessReq;
import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor;
import tech.powerjob.server.remote.transport.TransportService;
/**
* 处理朋友们的信息处理服务器与服务器之间的通讯
*
* @author tjq
* @since 2020/4/9
*/
@Slf4j
public class FriendRequestHandler extends AbstractActor {
public static Props defaultProps() {
return Props.create(FriendRequestHandler.class)
.withDispatcher("akka.friend-request-actor-dispatcher")
.withRouter(
new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)
.withResizer(new DefaultResizer(
Runtime.getRuntime().availableProcessors() * 4,
Runtime.getRuntime().availableProcessors() * 10,
1,
0.2d,
0.3d,
0.1d,
10
))
);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Ping.class, this::onReceivePing)
.match(RemoteProcessReq.class, this::onReceiveRemoteProcessReq)
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build();
}
@Override
public void preStart() throws Exception {
super.preStart();
log.debug("[FriendRequestHandler]init FriendRequestActor");
}
@Override
public void postStop() throws Exception {
super.postStop();
log.debug("[FriendRequestHandler]stop FriendRequestActor");
}
/**
* 处理存活检测的请求
*/
private void onReceivePing(Ping ping) {
getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf());
}
private void onReceiveRemoteProcessReq(RemoteProcessReq req) {
AskResponse response = new AskResponse();
response.setSuccess(true);
try {
response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req)));
} catch (Throwable t) {
log.error("[FriendActor] process remote request[{}] failed!", req, t);
response.setSuccess(false);
response.setMessage(ExceptionUtils.getMessage(t));
}
getSender().tell(response, getSelf());
}
}

View File

@ -1,29 +1,23 @@
package tech.powerjob.server.remote.server.election;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.tp.ServerURLFactory;
import tech.powerjob.server.remote.tp.TransportService;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -59,7 +53,7 @@ public class ServerElectionService {
public String elect(Long appId, String protocol, String currentServer) {
if (!accurate()) {
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
if (getProtocolServerAddress(protocol).equals(currentServer)) {
if (transportService.defaultProtocol().getAddress().equals(currentServer)) {
return currentServer;
}
}
@ -104,13 +98,14 @@ public class ServerElectionService {
}
// 篡位本机作为Server
// 注意写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址仅在返回的时候特殊处理
appInfo.setCurrentServer(transportService.getTransporter(Protocol.AKKA).getAddress());
// 注意写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址仅在返回的时候特殊处理 (4.3.0 更改为 HTTP)
final String selfDefaultAddress = transportService.defaultProtocol().getAddress();
appInfo.setCurrentServer(selfDefaultAddress);
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return getProtocolServerAddress(protocol);
return selfDefaultAddress;
}catch (Exception e) {
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
}finally {
@ -139,16 +134,18 @@ public class ServerElectionService {
Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis());
ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress);
URL targetUrl = ServerURLFactory.ping2Friend(serverAddress);
try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
downServerCache.remove(serverAddress);
AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class)
.toCompletableFuture()
.get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (response.isSuccess()) {
return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol);
log.info("[ServerElection] server[{}] is active, it will be the master.", serverAddress);
downServerCache.remove(serverAddress);
return serverAddress;
}
}catch (Exception e) {
log.warn("[ServerElection] server({}) was down.", serverAddress);
log.warn("[ServerElection] server[{}] was down.", serverAddress);
}
downServerCache.add(serverAddress);
return null;
@ -157,9 +154,4 @@ public class ServerElectionService {
private boolean accurate() {
return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage;
}
private String getProtocolServerAddress(String protocol) {
Protocol pt = Protocol.of(protocol);
return TransportService.getAllAddress().get(pt);
}
}

View File

@ -1,17 +1,9 @@
package tech.powerjob.server.remote.server.redirector;
import akka.pattern.Patterns;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.response.AskResponse;
import org.springframework.core.annotation.Order;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
@ -19,16 +11,25 @@ import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.tp.ServerURLFactory;
import tech.powerjob.server.remote.tp.TransportService;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* 指定服务器运行切面
@ -43,6 +44,7 @@ import java.util.concurrent.CompletionStage;
@RequiredArgsConstructor
public class DesignateServerAspect {
private final TransportService transportService;
private final AppInfoRepository appInfoRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -84,7 +86,7 @@ public class DesignateServerAspect {
}
// 目标IP与本地符合则本地执行
if (Objects.equals(targetServer, AkkaStarter.getActorSystemAddress())) {
if (Objects.equals(targetServer, transportService.defaultProtocol())) {
return point.proceed();
}
@ -96,8 +98,10 @@ public class DesignateServerAspect {
.setParameterTypes(parameterTypes)
.setArgs(args);
CompletionStage<Object> askCS = Patterns.ask(AkkaStarter.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get();
final URL friendUrl = ServerURLFactory.process2Friend(targetServer);
CompletionStage<AskResponse> askCS = transportService.ask(Protocol.HTTP.name(), friendUrl, remoteProcessReq, AskResponse.class);
AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!askResponse.isSuccess()) {
throw new PowerJobException("remote process failed: " + askResponse.getMessage());

View File

@ -0,0 +1,169 @@
package tech.powerjob.server.remote.tp;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.server.remote.actoes.ServerActor;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
/**
* server 数据传输服务
*
* @author tjq
* @since 2023/1/21
*/
@Slf4j
@Service
public class PowerTransportService implements TransportService, InitializingBean {
@Value("${oms.transporter.active.protocols}")
private String activeProtocols;
private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port";
private final Environment environment;
private final List<ServerActor> serverActors;
private ProtocolInfo defaultProtocol;
private final Map<String, ProtocolInfo> protocol2Transporter = Maps.newHashMap();
public PowerTransportService(List<ServerActor> serverActors, Environment environment) {
this.serverActors = serverActors;
this.environment = environment;
}
@Override
public ProtocolInfo defaultProtocol() {
return defaultProtocol;
}
private ProtocolInfo fetchProtocolInfo(String protocol) {
// 兼容老版 worker 未上报 protocol 的情况
protocol = compatibleProtocol(protocol);
final ProtocolInfo protocolInfo = protocol2Transporter.get(protocol);
if (protocolInfo == null) {
throw new IllegalArgumentException("can't find Transporter by protocol :" + protocol);
}
return protocolInfo;
}
@Override
public void tell(String protocol, URL url, PowerSerializable request) {
fetchProtocolInfo(protocol).getTransporter().tell(url, request);
}
@Override
public <T> CompletionStage<T> ask(String protocol, URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
return fetchProtocolInfo(protocol).getTransporter().ask(url, request, clz);
}
private void initRemoteFrameWork(String protocol, int port) {
Address address = new Address()
.setHost(NetUtils.getLocalHost())
.setPort(port);
EngineConfig engineConfig = new EngineConfig()
.setServerType(ServerType.SERVER)
.setType(protocol.toUpperCase())
.setBindAddress(address)
.setActorList(Lists.newArrayList(serverActors));
log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address);
RemoteEngine re = new PowerJobRemoteEngine();
final EngineOutput engineOutput = re.start(engineConfig);
log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);
this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
}
@Override
public void afterPropertiesSet() throws Exception {
log.info("[PowerTransportService] start to initialize whole PowerTransportService!");
log.info("[PowerTransportService] activeProtocols: {}", activeProtocols);
if (StringUtils.isEmpty(activeProtocols)) {
throw new IllegalArgumentException("activeProtocols can't be empty!");
}
for (String protocol : activeProtocols.split(OmsConstant.COMMA)) {
try {
final int port = parseProtocolPort(protocol);
initRemoteFrameWork(protocol, port);
} catch (Throwable t) {
log.error("[PowerTransportService] initialize protocol[{}] failed. If you don't need to use this protocol, you can turn it off by 'oms.transporter.active.protocols'", protocol);
ExceptionUtils.rethrow(t);
}
}
choseDefault();
log.info("[PowerTransportService] initialize successfully!");
log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocol2Transporter);
}
/**
* 获取协议端口考虑兼容性 & 用户仔细扩展的场景选择动态从 env 获取 port
* @return port
*/
private int parseProtocolPort(String protocol) {
final String key1 = String.format(PROTOCOL_PORT_CONFIG, protocol.toLowerCase());
final String key2 = String.format(PROTOCOL_PORT_CONFIG, protocol.toUpperCase());
String portStr = environment.getProperty(key1);
if (StringUtils.isEmpty(portStr)) {
portStr = environment.getProperty(key2);
}
log.info("[PowerTransportService] fetch port for protocol[{}], key={}, value={}", protocol, key1, portStr);
if (StringUtils.isEmpty(portStr)) {
throw new IllegalArgumentException(String.format("can't find protocol config by key: %s, please check your spring config!", key1));
}
return Integer.parseInt(portStr);
}
private String compatibleProtocol(String p) {
if (p == null) {
return Protocol.AKKA.name();
}
return p;
}
/**
* HTTP 优先否则默认取第一个协议
*/
private void choseDefault() {
ProtocolInfo httpP = protocol2Transporter.get(Protocol.HTTP.name());
if (httpP != null) {
log.info("[PowerTransportService] exist HTTP protocol, chose this as the default protocol!");
this.defaultProtocol = httpP;
return;
}
String firstProtocol = activeProtocols.split(OmsConstant.COMMA)[0];
this.defaultProtocol = this.protocol2Transporter.get(firstProtocol);
log.info("[PowerTransportService] chose [{}] as the default protocol!", firstProtocol);
if (this.defaultProtocol == null) {
throw new IllegalArgumentException("can't find default protocol, please check your config!");
}
}
}

View File

@ -0,0 +1,28 @@
package tech.powerjob.server.remote.tp;
import lombok.Getter;
import lombok.ToString;
import tech.powerjob.remote.framework.transporter.Transporter;
/**
* ProtocolInfo
*
* @author tjq
* @since 2023/1/21
*/
@Getter
@ToString
public class ProtocolInfo {
private final String protocol;
private final String address;
private final transient Transporter transporter;
public ProtocolInfo(String protocol, String address, Transporter transporter) {
this.protocol = protocol;
this.address = address;
this.transporter = transporter;
}
}

View File

@ -0,0 +1,52 @@
package tech.powerjob.server.remote.tp;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import static tech.powerjob.common.RemoteConstant.*;
/**
* 统一生成地址
*
* @author tjq
* @since 2023/1/21
*/
public class ServerURLFactory {
public static URL dispatchJob2Worker(String address) {
return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_RUN_JOB);
}
public static URL stopInstance2Worker(String address) {
return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_STOP_INSTANCE);
}
public static URL queryInstance2Worker(String address) {
return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_QUERY_INSTANCE_STATUS);
}
public static URL deployContainer2Worker(String address) {
return simileBuild(address, ServerType.WORKER, WORKER_PATH, WORKER_HANDLER_DEPLOY_CONTAINER);
}
public static URL destroyContainer2Worker(String address) {
return simileBuild(address, ServerType.WORKER, WORKER_PATH, WORKER_HANDLER_DESTROY_CONTAINER);
}
public static URL ping2Friend(String address) {
return simileBuild(address, ServerType.SERVER, S4S_PATH, S4S_HANDLER_PING);
}
public static URL process2Friend(String address) {
return simileBuild(address, ServerType.SERVER, S4S_PATH, S4S_HANDLER_PROCESS);
}
public static URL simileBuild(String address, ServerType type, String rootPath, String handlerPath) {
return new URL()
.setServerType(type)
.setAddress(Address.fromIpv4(address))
.setLocation(new HandlerLocation().setRootPath(rootPath).setMethodPath(handlerPath));
}
}

View File

@ -0,0 +1,29 @@
package tech.powerjob.server.remote.tp;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import java.util.concurrent.CompletionStage;
/**
* server 数据传输服务
*
* @author tjq
* @since 2023/1/21
*/
public interface TransportService {
/**
* 自用地址用于维护 server -> appId server 间通讯
* 4.3.0 前为 ActorSystem Addressip:10086
* 4.3.0 PowerJob 将主协议切换为 HTTP使用 HTTP address ip:10010
* @return 自用地址
*/
ProtocolInfo defaultProtocol();
void tell(String protocol, URL url, PowerSerializable request);
<T> CompletionStage<T> ask(String protocol, URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}

View File

@ -1,66 +0,0 @@
package tech.powerjob.server.remote.transport;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.response.AskResponse;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* TransportService
*
* @author tjq
* @since 2021/2/7
*/
@Slf4j
@Service
public class TransportService {
private static final Map<Protocol, String> protocol2Address = Maps.newHashMap();
@Getter
private final Map<Protocol, Transporter> protocol2Transporter = Maps.newConcurrentMap();
@Autowired
public TransportService(List<Transporter> transporters) {
transporters.forEach(t -> {
log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress());
protocol2Transporter.put(t.getProtocol(), t);
protocol2Address.put(t.getProtocol(), t.getAddress());
});
}
public void tell(Protocol protocol, String address, PowerSerializable object) {
getTransporter(protocol).tell(address, object);
}
public AskResponse ask(Protocol protocol, String address, PowerSerializable object) throws Exception {
return getTransporter(protocol).ask(address, object);
}
public Transporter getTransporter(Protocol protocol) {
Transporter transporter = protocol2Transporter.get(protocol);
if (transporter == null) {
log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol);
throw new UnknownProtocolException("can't find transporter by protocol: " + protocol);
}
return transporter;
}
public static class UnknownProtocolException extends RuntimeException {
public UnknownProtocolException(String message) {
super(message);
}
}
public static Map<Protocol, String> getAllAddress() {
return protocol2Address;
}
}

View File

@ -1,22 +0,0 @@
package tech.powerjob.server.remote.transport;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.response.AskResponse;
/**
* Transporter
*
* @author tjq
* @since 2021/2/7
*/
public interface Transporter {
Protocol getProtocol();
String getAddress();
void tell(String address, PowerSerializable object);
AskResponse ask(String address, PowerSerializable object) throws Exception;
}

View File

@ -1,48 +0,0 @@
package tech.powerjob.server.remote.transport.impl;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.server.remote.transport.Transporter;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* akka transporter
*
* @author tjq
* @since 2021/2/7
*/
@Service
public class AkkaTransporter implements Transporter {
@Override
public Protocol getProtocol() {
return Protocol.AKKA;
}
@Override
public String getAddress() {
return AkkaStarter.getActorSystemAddress();
}
@Override
public void tell(String address, PowerSerializable object) {
ActorSelection taskTrackerActor = AkkaStarter.getWorkerActor(address);
taskTrackerActor.tell(object, null);
}
@Override
public AskResponse ask(String address, PowerSerializable object) throws Exception {
ActorSelection taskTrackerActor = AkkaStarter.getWorkerActor(address);
CompletionStage<Object> askCS = Patterns.ask(taskTrackerActor, object, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
return (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
}

View File

@ -1,73 +0,0 @@
package tech.powerjob.server.remote.transport.impl;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.remote.transport.Transporter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* http transporter powered by vert.x
*
* @author tjq
* @since 2021/2/8
*/
@Slf4j
@Service
public class HttpTransporter implements Transporter {
private final WebClient webClient;
public HttpTransporter() {
WebClientOptions options = new WebClientOptions()
.setKeepAlive(false)
.setConnectTimeout((int) RemoteConstant.DEFAULT_TIMEOUT_MS);
webClient = WebClient.create(Vertx.vertx(), options);
}
@Override
public Protocol getProtocol() {
return Protocol.HTTP;
}
@Override
public String getAddress() {
return VertXStarter.getAddress();
}
@Override
public void tell(String address, PowerSerializable object) {
postRequest(address, object);
}
@Override
public AskResponse ask(String address, PowerSerializable object) throws Exception {
CompletableFuture<HttpResponse<Buffer>> future = postRequest(address, object).toCompletionStage().toCompletableFuture();
HttpResponse<Buffer> httpResponse = future.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return httpResponse.bodyAsJson(AskResponse.class);
}
private Future<HttpResponse<Buffer>> postRequest(String address, PowerSerializable object) {
Pair<String, Integer> ipAndPort = NetUtils.splitAddress2IpAndPort(address);
String ip = ipAndPort.getLeft();
int port = ipAndPort.getRight();
return webClient.post(port, ip, object.path())
.sendJson(object)
.onSuccess(res -> log.info("[HttpTransporter] send request to {}{} successfully: {}, response: {}", address, object.path(), object, res))
.onFailure(t -> log.warn("[HttpTransporter] send request to {}{} failed: {}", address, object.path(), object, t));
}
}

View File

@ -1,88 +0,0 @@
package tech.powerjob.server.remote.transport.starter;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.utils.PropertyUtils;
import tech.powerjob.server.remote.server.FriendRequestHandler;
import java.util.Map;
import java.util.Properties;
/**
* 服务端 ActorSystem 启动器
*
* @author tjq
* @since 2020/4/2
*/
@Slf4j
public class AkkaStarter {
public static ActorSystem actorSystem;
@Getter
private static String actorSystemAddress;
private static final String AKKA_PATH = "akka://%s@%s/user/%s";
public static void init() {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[PowerJob] PowerJob's akka system start to bootstrap...");
// 忽略了一个问题机器是没办法访问外网的除非架设自己的NTP服务器
// TimeUtils.check();
// 解析配置文件
Config akkaFinalConfig = parseConfig();
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
}
private static Config parseConfig() {
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT)));
String portFromJvm = System.getProperty(PowerJobServerConfigKey.AKKA_PORT);
if (StringUtils.isNotEmpty(portFromJvm)) {
log.info("[PowerJob] use port from jvm params: {}", portFromJvm);
port = Integer.parseInt(portFromJvm);
}
// 启动 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
String localIp = NetUtils.getLocalHost();
overrideConfig.put("akka.remote.artery.canonical.hostname", localIp);
overrideConfig.put("akka.remote.artery.canonical.port", port);
actorSystemAddress = localIp + ":" + port;
log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
return ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
}
/**
* 获取 ServerActor ActorSelection
*
* @param address IP:port
* @return ActorSelection
*/
public static ActorSelection getFriendActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
public static ActorSelection getWorkerActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
}

View File

@ -1,47 +0,0 @@
package tech.powerjob.server.remote.transport.starter;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.utils.PropertyUtils;
import com.google.common.base.Stopwatch;
import io.vertx.core.Vertx;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.Properties;
/**
* vert.x starter
*
* @author tjq
* @since 2021/2/8
*/
@Slf4j
public class VertXStarter {
public static Vertx vertx;
@Getter
private static String address;
public static void init() {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[PowerJob] PowerJob's vert.x system start to bootstrap...");
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));
String portFromJVM = System.getProperty(PowerJobServerConfigKey.HTTP_PORT);
if (StringUtils.isNotEmpty(portFromJVM)) {
port = Integer.parseInt(portFromJVM);
}
String localIP = NetUtils.getLocalHost();
address = localIP + ":" + port;
log.info("[PowerJob] vert.x server address: {}", address);
vertx = Vertx.vertx();
log.info("[PowerJob] PowerJob's vert.x system started successfully, using time {}.", stopwatch);
}
}

View File

@ -1,8 +1,6 @@
package tech.powerjob.server;
import tech.powerjob.server.common.utils.PropertyUtils;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -30,9 +28,6 @@ public class PowerJobServerApplication {
pre();
AkkaStarter.init();
VertXStarter.init();
// Start SpringBoot application.
try {
SpringApplication.run(PowerJobServerApplication.class, args);

View File

@ -1,20 +1,5 @@
package tech.powerjob.server.web.controller;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.common.constants.ContainerSourceType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.container.ContainerTemplateGenerator;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.core.container.ContainerService;
import tech.powerjob.server.web.request.GenerateContainerTemplateRequest;
import tech.powerjob.server.web.request.SaveContainerInfoRequest;
import tech.powerjob.server.web.response.ContainerInfoVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
@ -22,8 +7,21 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.common.constants.ContainerSourceType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.core.container.ContainerService;
import tech.powerjob.server.core.container.ContainerTemplateGenerator;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.web.request.GenerateContainerTemplateRequest;
import tech.powerjob.server.web.request.SaveContainerInfoRequest;
import tech.powerjob.server.web.response.ContainerInfoVO;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
@ -114,17 +112,6 @@ public class ContainerController {
return ResultDTO.failed("No workers have even registered");
}
// 转发 HTTP 请求
if (!AkkaStarter.getActorSystemAddress().equals(targetServer)) {
String targetIp = targetServer.split(":")[0];
String url = String.format("http://%s:%d/container/listDeployedWorker?appId=%d&containerId=%d", targetIp, port, appId, containerId);
try {
response.sendRedirect(url);
return ResultDTO.success(null);
}catch (Exception e) {
return ResultDTO.failed(e);
}
}
return ResultDTO.success(containerService.fetchDeployedInfo(appId, containerId));
}

View File

@ -13,7 +13,7 @@ import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.server.election.ServerElectionService;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.tp.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.Optional;
@ -55,7 +55,7 @@ public class ServerController {
public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
JSONObject res = new JSONObject();
res.put("localHost", NetUtils.getLocalHost());
res.put("communicationSystemInfo", transportService.getProtocol2Transporter());
res.put("defaultAddress", transportService.defaultProtocol());
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
res.put("appIds", workerClusterQueryService.getAppId2ClusterStatus().keySet());

View File

@ -18,6 +18,7 @@ spring.main.allow-circular-references=true
###### PowerJob self-owned configuration (The following properties should exist in application.properties only). ######
# Akka ActorSystem port.
oms.transporter.active.protocols=AKKA,HTTP
oms.akka.port=10086
oms.http.port=10010
# Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_

View File

@ -30,7 +30,7 @@ import static tech.powerjob.common.RemoteConstant.*;
public class TransportUtils {
public static void ttReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address);
final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_INSTANCE_STATUS, address);
transporter.tell(url, req);
}
@ -55,12 +55,12 @@ public class TransportUtils {
}
public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_LOG, address);
final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
transporter.tell(url, req);
}
public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_HEARTBEAT, address);
final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address);
transporter.tell(url, req);
}
@ -83,17 +83,17 @@ public class TransportUtils {
@SneakyThrows
public static boolean reliableTtReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) {
return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address, req, transporter).isSuccess();
return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_INSTANCE_STATUS, address, req, transporter).isSuccess();
}
@SneakyThrows
public static AskResponse reliableQueryJobCluster(WorkerQueryExecutorClusterReq req, String address, Transporter transporter) {
return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter);
return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter);
}
@SneakyThrows
public static AskResponse reliableQueryContainerInfo(WorkerNeedDeployContainerRequest req, String address, Transporter transporter) {
return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, address, req, transporter);
return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, address, req, transporter);
}
private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {