diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java index ee2b517d..07f02e94 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java @@ -10,6 +10,7 @@ import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder; @@ -81,6 +82,14 @@ public class InstanceManager implements TransportServiceAware { log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId); return; } + + // 考虑极端情况:Processor 处理耗时小于 server 写 DB 耗时,会导致状态上报时无 taskTracker 地址,此处等待后重新从DB获取数据 GitHub#620 + if (StringUtils.isEmpty(instanceInfo.getTaskTrackerAddress())) { + log.warn("[InstanceManager-{}] TaskTrackerAddress is empty, server will wait then acquire again!", instanceId); + CommonUtils.easySleep(277); + instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); + } + int originStatus = instanceInfo.getStatus(); // 丢弃过期的上报数据 if (req.getReportTime() <= instanceInfo.getLastReportTime()) { diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/ZeroCostTestProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/ZeroCostTestProcessor.java new file mode 100644 index 00000000..4f4fdf4a --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/ZeroCostTestProcessor.java @@ -0,0 +1,18 @@ +package tech.powerjob.samples.processors.test; + +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +/** + * ZeroCostTestProcessor + * + * @author tjq + * @since 2023/5/7 + */ +public class ZeroCostTestProcessor implements BasicProcessor { + @Override + public ProcessResult process(TaskContext context) throws Exception { + return new ProcessResult(true, "zero cost"); + } +}