fix: server return Optional to worker

This commit is contained in:
tjq 2023-01-22 10:52:53 +08:00
parent 63a5e2b458
commit dca97010c7
6 changed files with 21 additions and 23 deletions

View File

@ -53,7 +53,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event);
protected abstract Optional<AskResponse> processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception;
protected abstract AskResponse processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception;
protected abstract void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event);
@ -77,7 +77,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
@Override
@Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING)
public Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) {
public AskResponse processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) {
long startMs = System.currentTimeMillis();
TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent()
.setAppId(req.getAppId())
@ -92,7 +92,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
} catch (Exception e) {
event.setServerProcessStatus(TtReportInstanceStatusEvent.Status.FAILED);
log.error("[WorkerRequestHandler] processTaskTrackerReportInstanceStatus failed for request: {}", req, e);
return Optional.of(AskResponse.failed(ExceptionUtils.getMessage(e)));
return AskResponse.failed(ExceptionUtils.getMessage(e));
} finally {
event.setServerProcessCost(System.currentTimeMillis() - startMs);
monitorService.monitor(event);

View File

@ -3,8 +3,6 @@ package tech.powerjob.server.core.handler;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import java.util.Optional;
/**
* 定义 server worker 之间需要处理的协议
*
@ -24,7 +22,7 @@ public interface IWorkerRequestHandler {
* @param req 上报请求
* @return 响应信息
*/
Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
AskResponse processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
/**
* 处理 worker 查询执行器集群

View File

@ -22,8 +22,6 @@ import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepositor
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.Optional;
/**
* receive and process worker's request
*
@ -55,7 +53,7 @@ public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
}
@Override
protected Optional<AskResponse> processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception {
protected AskResponse processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception {
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息再更新实例状态这里一定不会有异常
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
// 更新工作流上下文信息
@ -66,9 +64,10 @@ public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
// 结束状态成功/失败需要回复消息
if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
return Optional.of(AskResponse.succeed(null));
return AskResponse.succeed(null);
}
return Optional.empty();
return null;
}
@Override

View File

@ -45,6 +45,13 @@ public class PowerTransportService implements TransportService, InitializingBean
@Value("${oms.transporter.active.protocols}")
private String activeProtocols;
/**
* 主要通讯协议用于 server server 之间的通讯用户必须保证该协议可用
*/
@Value("${oms.transporter.main.protocol}")
private String mainProtocol;
private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port";
private final Environment environment;
@ -170,16 +177,10 @@ public class PowerTransportService implements TransportService, InitializingBean
* HTTP 优先否则默认取第一个协议
*/
private void choseDefault() {
ProtocolInfo httpP = protocolName2Info.get(Protocol.HTTP.name());
if (httpP != null) {
log.info("[PowerTransportService] exist HTTP protocol, chose this as the default protocol!");
this.defaultProtocol = httpP;
return;
}
String firstProtocol = activeProtocols.split(OmsConstant.COMMA)[0];
this.defaultProtocol = this.protocolName2Info.get(firstProtocol);
log.info("[PowerTransportService] chose [{}] as the default protocol!", firstProtocol);
this.defaultProtocol = this.protocolName2Info.get(mainProtocol);
log.info("[PowerTransportService] chose [{}] as the default protocol, make sure this protocol can work!", mainProtocol);
if (this.defaultProtocol == null) {
throw new IllegalArgumentException("can't find default protocol, please check your config!");

View File

@ -16,9 +16,9 @@ spring.servlet.multipart.max-request-size=209715200
# temporary skip circular references check
spring.main.allow-circular-references=true
###### PowerJob self-owned configuration (The following properties should exist in application.properties only). ######
# Akka ActorSystem port.
###### PowerJob transporter configuration ######
oms.transporter.active.protocols=AKKA,HTTP
oms.transporter.main.protocol=HTTP
oms.akka.port=10086
oms.http.port=10010
# Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_

View File

@ -46,7 +46,7 @@ public class BuiltInSpringProcessorFactory implements ProcessorFactory {
.setProcessor(basicProcessor)
.setClassLoader(basicProcessor.getClass().getClassLoader());
} catch (Throwable t) {
log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed!", t);
log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t);
}
return null;