From dfbf9ec13782a3ee7110c87a4ead1d346c30336c Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 10 Feb 2021 00:01:37 +0800 Subject: [PATCH] fix: returns success even if the status message fails to be processed in http protocol #209 --- .../kfcfans/powerjob/common/OmsConstant.java | 1 + .../handler/outer/WorkerRequestAkkaHandler.java | 4 ++-- .../server/handler/outer/WorkerRequestHandler.java | 14 +++++--------- .../handler/outer/WorkerRequestHttpHandler.java | 14 +++++++++++--- .../server/persistence/core/model/AppInfoDO.java | 6 +++++- .../service/ha/DefaultServerElectionService.java | 2 +- 6 files changed, 25 insertions(+), 16 deletions(-) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java index 79308013..d0aa2f05 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/OmsConstant.java @@ -19,5 +19,6 @@ public class OmsConstant { public static final String COMMA = ","; public static final String LINE_SEPARATOR = "\r\n"; + public static final String HTTP_HEADER_CONTENT_TYPE = "Content-Type"; public static final String JSON_MEDIA_TYPE = "application/json; charset=utf-8"; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java index c2ae96f8..00ebef8f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestAkkaHandler.java @@ -26,7 +26,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { .match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req)) .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) - .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) + .matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj)) .build(); } @@ -44,7 +44,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { getSender().tell(AskResponse.succeed(null), getSelf()); } }catch (Exception e) { - log.error("[ServerActor] update instance status failed for request: {}.", req, e); + log.error("[WorkerRequestAkkaHandler] update instance status failed for request: {}.", req, e); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java index b6a350e5..f5f6ea01 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHandler.java @@ -58,16 +58,12 @@ public class WorkerRequestHandler { * 处理 instance 状态 * @param req 任务实例的状态上报请求 */ - public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { - try { - instanceManager.updateStatus(req); + public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws Exception { + instanceManager.updateStatus(req); - // 结束状态(成功/失败)需要回复消息 - if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { - return Optional.of(AskResponse.succeed(null)); - } - }catch (Exception e) { - log.error("[ServerActor] update instance status failed for request: {}.", req, e); + // 结束状态(成功/失败)需要回复消息 + if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { + return Optional.of(AskResponse.succeed(null)); } return Optional.empty(); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java index d42903f1..ccb6ad6e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java @@ -16,6 +16,8 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.Properties; @@ -27,6 +29,7 @@ import static com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHand * @author tjq * @since 2021/2/8 */ +@Slf4j public class WorkerRequestHttpHandler extends AbstractVerticle { @Override @@ -49,8 +52,13 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT) .blockingHandler(ctx -> { TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class); - getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); - out(ctx, AskResponse.succeed(null)); + try { + getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + out(ctx, AskResponse.succeed(null)); + } catch (Exception e) { + log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e); + out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e))); + } }); router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT) .blockingHandler(ctx -> { @@ -63,7 +71,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { private static void out(RoutingContext ctx, Object msg) { ctx.response() - .putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE) + .putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE) .end(JsonObject.mapFrom(msg).encode()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java index a06d2766..c33387e3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java @@ -26,7 +26,11 @@ public class AppInfoDO { // 应用分组密码 private String password; - // 当前负责该 appName 旗下任务调度的server地址,IP:Port(注意,该地址为ActorSystem地址,而不是HTTP地址,两者端口不同) + /** + * 当前负责该 appName 旗下任务调度的server地址,IP:Port(注意,该地址为ActorSystem地址,而不是HTTP地址,两者端口不同) + * 支持多语言后,尽管引入了 vert.x 的地址,但该字段仍保存 ActorSystem 的地址,vert.x 地址仅在返回给 worker 时特殊处理 + * 原因:框架中很多地方强依赖 currentServer,比如根据该地址来获取需要调度的 app + */ private String currentServer; private Date gmtCreate; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java index 054ddba9..6f52ac25 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java @@ -106,7 +106,7 @@ public class DefaultServerElectionService implements ServerElectionService { log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); return getProtocolServerAddress(protocol); }catch (Exception e) { - log.warn("[ServerElection] write new server to db failed for app {}.", appName); + log.error("[ServerElection] write new server to db failed for app {}.", appName, e); }finally { lockService.unlock(lockName); }