fix: returns success even if the status message fails to be processed in http protocol #209

This commit is contained in:
tjq 2021-02-10 00:01:37 +08:00
parent 5586d48f93
commit dfbf9ec137
6 changed files with 25 additions and 16 deletions

View File

@ -19,5 +19,6 @@ public class OmsConstant {
public static final String COMMA = ",";
public static final String LINE_SEPARATOR = "\r\n";
public static final String HTTP_HEADER_CONTENT_TYPE = "Content-Type";
public static final String JSON_MEDIA_TYPE = "application/json; charset=utf-8";
}

View File

@ -26,7 +26,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
.match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req))
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj))
.build();
}
@ -44,7 +44,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
getSender().tell(AskResponse.succeed(null), getSelf());
}
}catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req, e);
log.error("[WorkerRequestAkkaHandler] update instance status failed for request: {}.", req, e);
}
}

View File

@ -58,16 +58,12 @@ public class WorkerRequestHandler {
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try {
instanceManager.updateStatus(req);
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws Exception {
instanceManager.updateStatus(req);
// 结束状态成功/失败需要回复消息
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) {
return Optional.of(AskResponse.succeed(null));
}
}catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req, e);
// 结束状态成功/失败需要回复消息
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) {
return Optional.of(AskResponse.succeed(null));
}
return Optional.empty();
}

View File

@ -16,6 +16,8 @@ import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.util.Properties;
@ -27,6 +29,7 @@ import static com.github.kfcfans.powerjob.server.handler.outer.WorkerRequestHand
* @author tjq
* @since 2021/2/8
*/
@Slf4j
public class WorkerRequestHttpHandler extends AbstractVerticle {
@Override
@ -49,8 +52,13 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
.blockingHandler(ctx -> {
TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
out(ctx, AskResponse.succeed(null));
try {
getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
out(ctx, AskResponse.succeed(null));
} catch (Exception e) {
log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);
out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e)));
}
});
router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
.blockingHandler(ctx -> {
@ -63,7 +71,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
private static void out(RoutingContext ctx, Object msg) {
ctx.response()
.putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE)
.putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE)
.end(JsonObject.mapFrom(msg).encode());
}

View File

@ -26,7 +26,11 @@ public class AppInfoDO {
// 应用分组密码
private String password;
// 当前负责该 appName 旗下任务调度的server地址IP:Port注意该地址为ActorSystem地址而不是HTTP地址两者端口不同
/**
* 当前负责该 appName 旗下任务调度的server地址IP:Port注意该地址为ActorSystem地址而不是HTTP地址两者端口不同
* 支持多语言后尽管引入了 vert.x 的地址但该字段仍保存 ActorSystem 的地址vert.x 地址仅在返回给 worker 时特殊处理
* 原因框架中很多地方强依赖 currentServer比如根据该地址来获取需要调度的 app
*/
private String currentServer;
private Date gmtCreate;

View File

@ -106,7 +106,7 @@ public class DefaultServerElectionService implements ServerElectionService {
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return getProtocolServerAddress(protocol);
}catch (Exception e) {
log.warn("[ServerElection] write new server to db failed for app {}.", appName);
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
}finally {
lockService.unlock(lockName);
}