feat: add WorkerRequestHttpHandler powered by vert.x

This commit is contained in:
tjq 2021-02-08 20:03:35 +08:00
parent d978f84a60
commit 57501075de
31 changed files with 392 additions and 182 deletions

View File

@ -8,6 +8,9 @@ package com.github.kfcfans.powerjob.common;
*/
public class OmsConstant {
public static final int SERVER_DEFAULT_AKKA_PORT = 10086;
public static final int SERVER_DEFAULT_HTTP_PORT = 10010;
public static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
public static final String TIME_PATTERN_PLUS = "yyyy-MM-dd HH:mm:ss.SSS";

View File

@ -31,6 +31,7 @@
<commons.net.version>3.6</commons.net.version>
<fastjson.version>1.2.68</fastjson.version>
<dingding.version>1.0.1</dingding.version>
<vertx-web.version>4.0.2</vertx-web.version>
<!-- Skip this module when deploying. -->
<maven.deploy.skip>true</maven.deploy.skip>
@ -184,6 +185,12 @@
</exclusions>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>${vertx-web.version}</version>
</dependency>
<!-- swagger2 -->
<dependency>

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.powerjob.server;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.transport.starter.VertXStarter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -28,8 +30,8 @@ public class OhMyApplication {
pre();
// Init ActorSystem first
OhMyServer.init();
AkkaStarter.init();
VertXStarter.init();
// Start SpringBoot application.
try {
@ -42,6 +44,7 @@ public class OhMyApplication {
private static void pre() {
log.info(TIPS);
PropertyUtils.init();
}
}

View File

@ -9,9 +9,13 @@ package com.github.kfcfans.powerjob.server.common;
public class PowerJobServerConfigKey {
/**
* akka 端口号
* akka 协议端口号
*/
public static final String AKKA_PORT = "oms.akka.port";
/**
* http 协议端口号
*/
public static final String HTTP_PORT = "oms.http.port";
/**
* 自定义数据库表前缀
*/

View File

@ -5,8 +5,8 @@ import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.akka.requests.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.handler.inner.requests.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import lombok.extern.slf4j.Slf4j;
@ -70,7 +70,7 @@ public class DesignateServerAspect {
String targetServer = appInfo.getCurrentServer();
// 目标IP与本地符合则本地执行
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
if (Objects.equals(targetServer, AkkaStarter.getActorSystemAddress())) {
return point.proceed();
}
@ -82,7 +82,7 @@ public class DesignateServerAspect {
.setParameterTypes(parameterTypes)
.setArgs(args);
CompletionStage<Object> askCS = Patterns.ask(OhMyServer.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
CompletionStage<Object> askCS = Patterns.ask(AkkaStarter.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get();
if (!askResponse.isSuccess()) {

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.transport.akka.actors;
package com.github.kfcfans.powerjob.server.handler.inner;
import akka.actor.AbstractActor;
import com.alibaba.fastjson.JSONObject;
@ -6,9 +6,9 @@ import com.github.kfcfans.powerjob.common.model.WorkerInfo;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.transport.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.transport.akka.requests.Ping;
import com.github.kfcfans.powerjob.server.transport.akka.requests.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.handler.inner.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.handler.inner.requests.Ping;
import com.github.kfcfans.powerjob.server.handler.inner.requests.RemoteProcessReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.ReflectionUtils;
@ -23,7 +23,7 @@ import java.util.Map;
* @since 2020/4/9
*/
@Slf4j
public class FriendActor extends AbstractActor {
public class FriendRequestHandler extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.transport.akka.requests;
package com.github.kfcfans.powerjob.server.handler.inner.requests;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.transport.akka.requests;
package com.github.kfcfans.powerjob.server.handler.inner.requests;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Data;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.transport.akka.requests;
package com.github.kfcfans.powerjob.server.handler.inner.requests;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Getter;

View File

@ -0,0 +1,68 @@
package com.github.kfcfans.powerjob.server.handler.outer;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.request.*;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import static com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHandler.getWorkerRequestHandler;
/**
* 处理 Worker 请求
*
* @author tjq
* @since 2020/3/30
*/
@Slf4j
public class WorkerRequestAkkaHandler extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, hb -> getWorkerRequestHandler().onReceiveWorkerHeartbeat(hb))
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req))
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
/**
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try {
Optional<AskResponse> askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
if (askResponseOpt.isPresent()) {
getSender().tell(AskResponse.succeed(null), getSelf());
}
}catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req, e);
}
}
/**
* 处理 Worker容器部署请求
* @param req 容器部署请求
*/
private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
getSender().tell(getWorkerRequestHandler().onReceiveWorkerNeedDeployContainerRequest(req), getSelf());
}
/**
* 处理 worker 请求获取当前任务所有处理器节点的请求
* @param req jobId + appId
*/
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
getSender().tell(getWorkerRequestHandler().onReceiveWorkerQueryExecutorClusterReq(req), getSelf());
}
}

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.powerjob.server.transport.akka.actors;
package com.github.kfcfans.powerjob.server.handler.outer;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.model.WorkerInfo;
import com.github.kfcfans.powerjob.common.request.*;
@ -19,40 +18,39 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 处理 Worker 请求
* receive and process worker's request
*
* @author tjq
* @since 2020/3/30
* @since 2021/2/8
*/
@Slf4j
public class ServerActor extends AbstractActor {
@Component
public class WorkerRequestHandler {
@Resource
private Environment environment;
@Resource
private InstanceManager instanceManager;
@Resource
private InstanceLogService instanceLogService;
@Resource
private ContainerInfoRepository containerInfoRepository;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq)
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
private static WorkerRequestHandler workerRequestHandler;
/**
* 处理 Worker 的心跳请求
* @param heartbeat 心跳包
*/
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
public void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
WorkerManagerService.updateStatus(heartbeat);
}
@ -60,36 +58,35 @@ public class ServerActor extends AbstractActor {
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try {
getInstanceManager().updateStatus(req);
instanceManager.updateStatus(req);
// 结束状态成功/失败需要回复消息
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) {
getSender().tell(AskResponse.succeed(null), getSelf());
return Optional.of(AskResponse.succeed(null));
}
}catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req, e);
}
return Optional.empty();
}
/**
* 处理OMS在线日志请求
* @param req 日志请求
*/
private void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
public void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get ...
SpringUtils.getBean(InstanceLogService.class).submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
/**
* 处理 Worker容器部署请求
* @param req 容器部署请求
*/
private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
public AskResponse onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
ContainerInfoRepository containerInfoRepository = SpringUtils.getBean(ContainerInfoRepository.class);
Environment environment = SpringUtils.getBean(Environment.class);
String port = environment.getProperty("local.server.port");
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
@ -109,14 +106,14 @@ public class ServerActor extends AbstractActor {
askResponse.setData(JsonUtils.toBytes(dpReq));
}
getSender().tell(askResponse, getSelf());
return askResponse;
}
/**
* 处理 worker 请求获取当前任务所有处理器节点的请求
* @param req jobId + appId
*/
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
public AskResponse onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
@ -137,14 +134,13 @@ public class ServerActor extends AbstractActor {
}else {
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
}
getSender().tell(askResponse, getSelf());
return askResponse;
}
// 不需要加锁 Spring IOC 中重复取并没什么问题
private InstanceManager getInstanceManager() {
if (instanceManager == null) {
instanceManager = SpringUtils.getBean(InstanceManager.class);
public static WorkerRequestHandler getWorkerRequestHandler() {
if (workerRequestHandler == null) {
workerRequestHandler = SpringUtils.getBean(WorkerRequestHandler.class);
}
return instanceManager;
return workerRequestHandler;
}
}

View File

@ -0,0 +1,68 @@
package com.github.kfcfans.powerjob.server.handler.outer;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.Optional;
import java.util.Properties;
import static com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHandler.getWorkerRequestHandler;
/**
* WorkerRequestHandler
*
* @author tjq
* @since 2021/2/8
*/
public class WorkerRequestHttpHandler extends AbstractVerticle {
private static final String HTTP_PREFIX = "/wrh/v1/";
@Override
public void start() throws Exception {
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));
HttpServerOptions options = new HttpServerOptions();
HttpServer server = vertx.createHttpServer(options);
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post(HTTP_PREFIX + "heartbeat")
.handler(ctx -> {
WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat);
});
router.post(HTTP_PREFIX + "instanceStatusReport")
.blockingHandler(ctx -> {
TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
Optional<AskResponse> askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
askResponseOpt.ifPresent(askResponse -> out(ctx, askResponse));
});
router.post(HTTP_PREFIX + "logReport")
.blockingHandler(ctx -> {
WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
getWorkerRequestHandler().onReceiveWorkerLogReportReq(req);
});
server.requestHandler(router).listen(port);
}
private static void out(RoutingContext ctx, Object msg) {
ctx.response()
.putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE)
.end(JsonObject.mapFrom(msg).encode());
}
}

View File

@ -10,7 +10,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
@ -126,7 +126,7 @@ public class ContainerService {
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> {
ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
ActorSelection workerActor = AkkaStarter.getWorkerActor(akkaAddress);
workerActor.tell(destroyRequest, null);
});
@ -260,7 +260,7 @@ public class ContainerService {
AtomicInteger count = new AtomicInteger();
workerAddressList.forEach(akkaAddress -> {
ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
ActorSelection workerActor = AkkaStarter.getWorkerActor(akkaAddress);
workerActor.tell(req, null);
remote.sendText("SYSTEM: send deploy request to " + akkaAddress);

View File

@ -165,7 +165,7 @@ public class DispatchService {
WorkerInfo taskTracker = allAvailableWorker.get(0);
String taskTrackerAddress = taskTracker.getAddress();
transportService.transfer(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);
// 修改状态

View File

@ -85,6 +85,10 @@ public class ClusterStatusHolder {
return workers;
}
public WorkerInfo getWorkerInfo(String address) {
return address2WorkerInfo.get(address);
}
public List<WorkerInfo> getAvailableWorkers(double minCPUCores, double minMemorySpace, double minDiskSpace) {
List<WorkerInfo> workerInfos = Lists.newArrayList();
address2WorkerInfo.forEach((address, workerInfo) -> {

View File

@ -4,8 +4,8 @@ import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.akka.requests.Ping;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.handler.inner.requests.Ping;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.extension.LockService;
@ -50,7 +50,7 @@ public class ServerSelectService {
public String getServer(Long appId, String currentServer) {
if (!accurate()) {
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
if (OhMyServer.getActorSystemAddress().equals(currentServer)) {
if (AkkaStarter.getActorSystemAddress().equals(currentServer)) {
return currentServer;
}
}
@ -93,7 +93,7 @@ public class ServerSelectService {
}
// 篡位本机作为Server
appInfo.setCurrentServer(OhMyServer.getActorSystemAddress());
appInfo.setCurrentServer(AkkaStarter.getActorSystemAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
@ -123,14 +123,14 @@ public class ServerSelectService {
return false;
}
if (OhMyServer.getActorSystemAddress().equals(serverAddress)) {
if (AkkaStarter.getActorSystemAddress().equals(serverAddress)) {
return true;
}
Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis());
ActorSelection serverActor = OhMyServer.getFriendActor(serverAddress);
ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress);
try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);

View File

@ -7,10 +7,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
* Worker 管理服务
@ -56,6 +53,15 @@ public class WorkerManagerService {
return clusterStatusHolder.getAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace);
}
public static Optional<WorkerInfo> getWorkerInfo(Long appId, String address) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
return Optional.empty();
}
return Optional.ofNullable(clusterStatusHolder.getWorkerInfo(address));
}
/**
* 清理不需要的worker信息
* @param usingAppIds 需要维护的appId其余的数据将被删除

View File

@ -1,14 +1,12 @@
package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.model.WorkerInfo;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils;
@ -18,17 +16,17 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.transport.TransportService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.github.kfcfans.powerjob.common.InstanceStatus.RUNNING;
@ -44,6 +42,8 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.STOPPED;
@Service
public class InstanceService {
@Resource
private TransportService transportService;
@Resource
private DispatchService dispatchService;
@Resource
@ -115,11 +115,15 @@ public class InstanceService {
不可靠通知停止 TaskTracker
假如没有成功关闭之后 TaskTracker 会再次 reportStatus按照流程instanceLog 会被更新为 RUNNING开发者可以再次手动关闭
*/
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceInfo.getTaskTrackerAddress());
Optional<WorkerInfo> workerInfoOpt = WorkerManagerService.getWorkerInfo(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
taskTrackerActor.tell(req, null);
log.info("[Instance-{}] update instanceInfo and send request succeed.", instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req);
log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId);
} else {
log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId);
}
}catch (IllegalArgumentException ie) {
throw ie;
@ -248,13 +252,12 @@ public class InstanceService {
return detail;
}
// 运行状态下交由 TaskTracker 返回相关信息
try {
Optional<WorkerInfo> workerInfoOpt = WorkerManagerService.getWorkerInfo(instanceInfoDO.getAppId(), instanceInfoDO.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) {
WorkerInfo workerInfo = workerInfoOpt.get();
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId);
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceInfoDO.getTaskTrackerAddress());
CompletionStage<Object> askCS = Patterns.ask(taskTrackerActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
try {
AskResponse askResponse = transportService.ask(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req);
if (askResponse.isSuccess()) {
InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class);
instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes());
@ -263,10 +266,10 @@ public class InstanceService {
}else {
log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, the message is {}.", instanceId, askResponse.getMessage());
}
} catch (Exception e) {
log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString());
}
}
// 失败则返回基础版信息
BeanUtils.copyProperties(instanceInfoDO, detail);

View File

@ -5,7 +5,7 @@ import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.persistence.core.model.*;
import com.github.kfcfans.powerjob.server.persistence.core.repository.*;
import com.github.kfcfans.powerjob.server.service.DispatchService;
@ -65,7 +65,7 @@ public class InstanceStatusCheckService {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询DB获取该Server需要负责的AppGroup
List<AppInfoDO> appInfoList = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress());
List<AppInfoDO> appInfoList = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(appInfoList)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.timing.schedule;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
@ -76,7 +76,7 @@ public class OmsScheduleService {
Stopwatch stopwatch = Stopwatch.createStarted();
// 先查询DB查看本机需要负责的任务
List<AppInfoDO> allAppInfos = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress());
List<AppInfoDO> allAppInfos = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppInfos)) {
log.info("[JobScheduleService] current server has no app's job to schedule.");
return;

View File

@ -2,11 +2,13 @@ package com.github.kfcfans.powerjob.server.transport;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -30,12 +32,21 @@ public class TransportService {
});
}
public void transfer(Protocol protocol, String address, OmsSerializable object) {
public void tell(Protocol protocol, String address, OmsSerializable object) {
Transporter transporter = protocol2Transporter.get(protocol);
if (transporter == null) {
log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol);
return;
}
transporter.transfer(address, object);
transporter.tell(address, object);
}
public AskResponse ask(Protocol protocol, String address, OmsSerializable object) throws Exception {
Transporter transporter = protocol2Transporter.get(protocol);
if (transporter == null) {
log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol);
throw new IOException("can't find transporter by protocol: " + protocol);
}
return transporter.ask(address, object);
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.transport;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.response.AskResponse;
/**
* Transporter
@ -15,5 +16,7 @@ public interface Transporter {
String getAddress();
void transfer(String address, OmsSerializable object);
void tell(String address, OmsSerializable object);
AskResponse ask(String address, OmsSerializable object) throws Exception;
}

View File

@ -1,33 +0,0 @@
package com.github.kfcfans.powerjob.server.transport.akka;
import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.server.transport.Transporter;
import org.springframework.stereotype.Service;
/**
* akka transporter
*
* @author tjq
* @since 2021/2/7
*/
@Service
public class AkkaTransporter implements Transporter {
@Override
public Protocol getProtocol() {
return Protocol.AKKA;
}
@Override
public String getAddress() {
return OhMyServer.getActorSystemAddress();
}
@Override
public void transfer(String address, OmsSerializable object) {
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(address);
taskTrackerActor.tell(object, null);
}
}

View File

@ -1,25 +0,0 @@
package com.github.kfcfans.powerjob.server.transport.akka.actors;
import akka.actor.AbstractActor;
import akka.actor.DeadLetter;
import lombok.extern.slf4j.Slf4j;
/**
* 处理 server 异常信息的 actor
*
* @author tjq
* @since 2020/7/18
*/
@Slf4j
public class ServerTroubleshootingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeadLetter.class, this::onReceiveDeadLetter)
.build();
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[ServerTroubleshootingActor] receive DeadLetter: {}", dl);
}
}

View File

@ -0,0 +1,48 @@
package com.github.kfcfans.powerjob.server.transport.impl;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.transport.Transporter;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* akka transporter
*
* @author tjq
* @since 2021/2/7
*/
@Service
public class AkkaTransporter implements Transporter {
@Override
public Protocol getProtocol() {
return Protocol.AKKA;
}
@Override
public String getAddress() {
return AkkaStarter.getActorSystemAddress();
}
@Override
public void tell(String address, OmsSerializable object) {
ActorSelection taskTrackerActor = AkkaStarter.getTaskTrackerActor(address);
taskTrackerActor.tell(object, null);
}
@Override
public AskResponse ask(String address, OmsSerializable object) throws Exception {
ActorSelection taskTrackerActor = AkkaStarter.getTaskTrackerActor(address);
CompletionStage<Object> askCS = Patterns.ask(taskTrackerActor, object, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
return (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
}

View File

@ -1,17 +1,19 @@
package com.github.kfcfans.powerjob.server.transport.akka;
package com.github.kfcfans.powerjob.server.transport.starter;
import akka.actor.*;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.transport.akka.actors.FriendActor;
import com.github.kfcfans.powerjob.server.transport.akka.actors.ServerActor;
import com.github.kfcfans.powerjob.server.transport.akka.actors.ServerTroubleshootingActor;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import com.github.kfcfans.powerjob.server.handler.inner.FriendRequestHandler;
import com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestAkkaHandler;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@ -32,7 +34,7 @@ import java.util.concurrent.CompletionStage;
* @since 2020/4/2
*/
@Slf4j
public class OhMyServer {
public class AkkaStarter {
public static ActorSystem actorSystem;
@Getter
@ -43,18 +45,17 @@ public class OhMyServer {
public static void init() {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyServer] OhMyServer's akka system start to bootstrap...");
log.info("[PowerJob] PowerJob's akka system start to bootstrap...");
// 忽略了一个问题机器是没办法访问外网的除非架设自己的NTP服务器
// TimeUtils.check();
// 解析配置文件
PropertyUtils.init();
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, "10086"));
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT)));
String portFromJVM = System.getProperty(PowerJobServerConfigKey.AKKA_PORT);
if (StringUtils.isNotEmpty(portFromJVM)) {
log.info("[OhMyWorker] use port from jvm params: {}", portFromJVM);
log.info("[PowerJob] use port from jvm params: {}", portFromJVM);
port = Integer.parseInt(portFromJVM);
}
@ -64,22 +65,18 @@ public class OhMyServer {
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
overrideConfig.put("akka.remote.artery.canonical.port", port);
actorSystemAddress = localIP + ":" + port;
log.info("[OhMyWorker] akka-remote server address: {}", actorSystemAddress);
log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(ServerActor.class)
actorSystem.actorOf(Props.create(WorkerRequestAkkaHandler.class)
.withDispatcher("akka.server-actor-dispatcher")
.withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME);
actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
actorSystem.actorOf(Props.create(FriendRequestHandler.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(ServerTroubleshootingActor.class), RemoteConstant.SERVER_TROUBLESHOOTING_ACTOR_NAME);
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch);
log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);
}
/**

View File

@ -0,0 +1,47 @@
package com.github.kfcfans.powerjob.server.transport.starter;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHttpHandler;
import com.google.common.base.Stopwatch;
import io.vertx.core.Vertx;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.Properties;
/**
* vert.x starter
*
* @author tjq
* @since 2021/2/8
*/
@Slf4j
public class VertXStarter {
@Getter
private static String address;
public static void init() {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[PowerJob] PowerJob's vert.x system start to bootstrap...");
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));
String portFromJVM = System.getProperty(PowerJobServerConfigKey.HTTP_PORT);
if (StringUtils.isNotEmpty(portFromJVM)) {
port = Integer.parseInt(portFromJVM);
}
String localIP = NetUtils.getLocalHost();
address = localIP + ":" + port;
log.info("[PowerJob] vert.x server address: {}", address);
Vertx.vertx().deployVerticle(new WorkerRequestHttpHandler());
log.info("[PowerJob] PowerJob's vert.x system started successfully, using time {}.", stopwatch);
}
}

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.ContainerTemplateGenerator;
@ -102,7 +102,7 @@ public class ContainerController {
}
// 转发 HTTP 请求
if (!OhMyServer.getActorSystemAddress().equals(targetServer)) {
if (!AkkaStarter.getActorSystemAddress().equals(targetServer)) {
String targetIp = targetServer.split(":")[0];
String url = String.format("http://%s:%d/container/listDeployedWorker?appId=%d&containerId=%d", targetIp, port, appId, containerId);
try {

View File

@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService;
@ -51,7 +51,7 @@ public class ServerController {
public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
JSONObject res = new JSONObject();
res.put("localHost", NetUtils.getLocalHost());
res.put("actorSystemAddress", OhMyServer.getActorSystemAddress());
res.put("actorSystemAddress", AkkaStarter.getActorSystemAddress());
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
res.put("appIds", WorkerManagerService.getAppId2ClusterStatus().keySet());

View File

@ -5,13 +5,12 @@ import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.model.WorkerInfo;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.transport.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.transport.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.handler.inner.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
@ -69,7 +68,7 @@ public class SystemInfoController {
// 重定向到指定 Server 获取集群信息
FriendQueryWorkerClusterStatusReq req = new FriendQueryWorkerClusterStatusReq(appId);
try {
ActorSelection friendActor = OhMyServer.getFriendActor(server);
ActorSelection friendActor = AkkaStarter.getFriendActor(server);
CompletionStage<Object> askCS = Patterns.ask(friendActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);

View File

@ -16,5 +16,6 @@ spring.servlet.multipart.max-request-size=209715200
###### PowerJob self-owned configuration (The following properties should exist in application.properties only). ######
# Akka ActorSystem port.
oms.akka.port=10086
oms.http.port=10010
# Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_
oms.table-prefix=