mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
chore: remove akka in common package
This commit is contained in:
parent
dedefd5a6d
commit
e74fc2d138
@ -19,7 +19,6 @@
|
||||
<commons.io.version>2.11.0</commons.io.version>
|
||||
<guava.version>31.1-jre</guava.version>
|
||||
<okhttp.version>3.14.9</okhttp.version>
|
||||
<akka.version>2.6.20</akka.version>
|
||||
<kryo.version>5.3.0</kryo.version>
|
||||
<jackson.version>2.14.0-rc1</jackson.version>
|
||||
<junit.version>5.9.0</junit.version>
|
||||
@ -54,18 +53,6 @@
|
||||
<version>${okhttp.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- akka remote -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-remote_2.13</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-slf4j_2.13</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- commons-io -->
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
|
@ -43,7 +43,6 @@
|
||||
<commons.net.version>3.8.0</commons.net.version>
|
||||
<fastjson.version>1.2.83</fastjson.version>
|
||||
<dingding.version>1.0.1</dingding.version>
|
||||
<vertx-web.version>4.0.2</vertx-web.version>
|
||||
|
||||
<!-- skip this module when deploying. -->
|
||||
<maven.deploy.skip>true</maven.deploy.skip>
|
||||
@ -252,17 +251,6 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-web</artifactId>
|
||||
<version>${vertx-web.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-web-client</artifactId>
|
||||
<version>${vertx-web.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.cronutils</groupId>
|
||||
<artifactId>cron-utils</artifactId>
|
||||
|
@ -1,27 +0,0 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
/**
|
||||
* WorkerRequestHandlerHolder
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@Component
|
||||
public class WorkerRequestHandlerHolder {
|
||||
|
||||
private static IWorkerRequestHandler workerRequestHandler;
|
||||
|
||||
public WorkerRequestHandlerHolder(IWorkerRequestHandler injectedWorkerRequestHandler) {
|
||||
workerRequestHandler = injectedWorkerRequestHandler;
|
||||
}
|
||||
|
||||
public static IWorkerRequestHandler fetchWorkerRequestHandler() {
|
||||
if (workerRequestHandler == null){
|
||||
throw new IllegalStateException("WorkerRequestHandlerHolder not initialized!");
|
||||
}
|
||||
return workerRequestHandler;
|
||||
}
|
||||
}
|
@ -1,100 +0,0 @@
|
||||
package tech.powerjob.server.core.handler.impl;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.Props;
|
||||
import akka.routing.DefaultResizer;
|
||||
import akka.routing.RoundRobinPool;
|
||||
import tech.powerjob.common.request.*;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler;
|
||||
|
||||
/**
|
||||
* 处理 Worker 请求
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/3/30
|
||||
*/
|
||||
@Slf4j
|
||||
public class WorkerRequestAkkaHandler extends AbstractActor {
|
||||
|
||||
|
||||
public static Props defaultProps(){
|
||||
return Props.create(WorkerRequestAkkaHandler.class)
|
||||
.withDispatcher("akka.w-r-c-d")
|
||||
.withRouter(
|
||||
new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)
|
||||
.withResizer(new DefaultResizer(
|
||||
Runtime.getRuntime().availableProcessors() * 4,
|
||||
Runtime.getRuntime().availableProcessors() * 10,
|
||||
1,
|
||||
0.2d,
|
||||
0.3d,
|
||||
0.1d,
|
||||
10
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(WorkerHeartbeat.class, hb -> fetchWorkerRequestHandler().processWorkerHeartbeat(hb))
|
||||
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
|
||||
.match(WorkerLogReportReq.class, req -> fetchWorkerRequestHandler().processWorkerLogReport(req))
|
||||
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
|
||||
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
|
||||
.matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
super.preStart();
|
||||
log.debug("[WorkerRequestAkkaHandler]init WorkerRequestActor");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void postStop() throws Exception {
|
||||
super.postStop();
|
||||
log.debug("[WorkerRequestAkkaHandler]stop WorkerRequestActor");
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 instance 状态
|
||||
* @param req 任务实例的状态上报请求
|
||||
*/
|
||||
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
|
||||
|
||||
try {
|
||||
Optional<AskResponse> askResponseOpt = fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
|
||||
if (askResponseOpt.isPresent()) {
|
||||
getSender().tell(AskResponse.succeed(null), getSelf());
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.error("[WorkerRequestAkkaHandler] update instance status failed for request: {}.", req, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 Worker容器部署请求
|
||||
* @param req 容器部署请求
|
||||
*/
|
||||
private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
|
||||
getSender().tell(fetchWorkerRequestHandler().processWorkerNeedDeployContainer(req), getSelf());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 worker 请求获取当前任务所有处理器节点的请求
|
||||
* @param req jobId + appId
|
||||
*/
|
||||
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
|
||||
|
||||
getSender().tell(fetchWorkerRequestHandler().processWorkerQueryExecutorCluster(req), getSelf());
|
||||
}
|
||||
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
package tech.powerjob.server.core.handler.impl;
|
||||
|
||||
import tech.powerjob.common.OmsConstant;
|
||||
import tech.powerjob.common.ProtocolConstant;
|
||||
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import tech.powerjob.common.request.WorkerHeartbeat;
|
||||
import tech.powerjob.common.request.WorkerLogReportReq;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.server.common.PowerJobServerConfigKey;
|
||||
import tech.powerjob.server.common.utils.PropertyUtils;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler;
|
||||
|
||||
/**
|
||||
* WorkerRequestHandler
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2021/2/8
|
||||
*/
|
||||
@Slf4j
|
||||
public class WorkerRequestHttpHandler extends AbstractVerticle {
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
|
||||
Properties properties = PropertyUtils.getProperties();
|
||||
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));
|
||||
|
||||
HttpServerOptions options = new HttpServerOptions();
|
||||
HttpServer server = vertx.createHttpServer(options);
|
||||
|
||||
Router router = Router.router(vertx);
|
||||
router.route().handler(BodyHandler.create());
|
||||
router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT)
|
||||
.handler(ctx -> {
|
||||
WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
|
||||
fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);
|
||||
success(ctx);
|
||||
});
|
||||
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
|
||||
.blockingHandler(ctx -> {
|
||||
TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
|
||||
try {
|
||||
fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
|
||||
out(ctx, AskResponse.succeed(null));
|
||||
} catch (Exception e) {
|
||||
log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);
|
||||
out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e)));
|
||||
}
|
||||
});
|
||||
router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
|
||||
.blockingHandler(ctx -> {
|
||||
WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
|
||||
fetchWorkerRequestHandler().processWorkerLogReport(req);
|
||||
success(ctx);
|
||||
});
|
||||
server.requestHandler(router).listen(port);
|
||||
}
|
||||
|
||||
private static void out(RoutingContext ctx, Object msg) {
|
||||
ctx.response()
|
||||
.putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE)
|
||||
.end(JsonObject.mapFrom(msg).encode());
|
||||
}
|
||||
|
||||
private static void success(RoutingContext ctx) {
|
||||
out(ctx, ResultDTO.success(null));
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user