fix FrequentTaskTracker's StandaloneJob always success bug

This commit is contained in:
tjq 2020-04-17 20:06:00 +08:00
parent 593ee714b8
commit 072d94f283
7 changed files with 45 additions and 9 deletions

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.akka.actors; package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.AskResponse;
@ -45,8 +46,10 @@ public class ServerActor extends AbstractActor {
try { try {
InstanceManager.updateStatus(req); InstanceManager.updateStatus(req);
// 回复接收成功 // 结束状态成功/失败需要回复消息
getSender().tell(AskResponse.succeed(null), getSelf()); if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) {
getSender().tell(AskResponse.succeed(null), getSelf());
}
}catch (Exception e) { }catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req, e); log.error("[ServerActor] update instance status failed for request: {}.", req, e);
} }

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.TaskResult; import com.github.kfcfans.oms.worker.core.processor.TaskResult;
import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor; import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -18,6 +19,7 @@ import java.util.concurrent.ThreadLocalRandom;
* @since 2020/4/17 * @since 2020/4/17
*/ */
@Slf4j @Slf4j
@Component
public class BroadcastProcessorDemo extends BroadcastProcessor { public class BroadcastProcessorDemo extends BroadcastProcessor {
@Override @Override

View File

@ -12,6 +12,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -24,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom;
* @since 2020/4/17 * @since 2020/4/17
*/ */
@Slf4j @Slf4j
@Component
public class MapReduceProcessorDemo extends MapReduceProcessor { public class MapReduceProcessorDemo extends MapReduceProcessor {
// 每一批发送任务大小 // 每一批发送任务大小

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext; import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -16,15 +17,16 @@ import java.util.concurrent.ThreadLocalRandom;
* @since 2020/4/17 * @since 2020/4/17
*/ */
@Slf4j @Slf4j
@Component
public class StandaloneProcessorDemo implements BasicProcessor { public class StandaloneProcessorDemo implements BasicProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
System.out.println("================ StandaloneProcessorDemo#process ================"); System.out.println("================ StandaloneProcessorDemo#process ================");
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
boolean success = ThreadLocalRandom.current().nextBoolean(); boolean success = ThreadLocalRandom.current().nextBoolean();
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
System.out.println("ProcessSuccess: " + success);
return new ProcessResult(success, context + ": " + success); return new ProcessResult(success, context + ": " + success);
} }
} }

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
<!-- appender是configuration的子节点是负责写日志的组件。 -->
<!-- ConsoleAppender把日志输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!-- <pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n)</pattern>-->
<pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) - %cyan(%msg%n)</pattern>
<!-- 控制台也要使用UTF-8不要使用GBK否则会中文乱码 -->
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="com.github.kfcfans.oms" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<!-- 控制台输出日志级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -257,17 +257,17 @@ public class FrequentTaskTracker extends TaskTracker {
continue; continue;
} }
String result;
switch (executeType) { switch (executeType) {
// STANDALONE 代表任务确实已经执行完毕了 // STANDALONE 代表任务确实已经执行完毕了
case STANDALONE: case STANDALONE:
// 查询数据库获取结果STANDALONE每个SubInstance只会有一条Task记录 // 查询数据库获取结果STANDALONE每个SubInstance只会有一条Task记录
result = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0).getResult(); TaskDO resultTask = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0);
onFinished(subInstanceId, true, result, iterator); boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
onFinished(subInstanceId, success, resultTask.getResult(), iterator);
continue; continue;
// MAP 不关心结果最简单 // MAP 不关心结果最简单
case MAP: case MAP:
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
onFinished(subInstanceId, holder.failedNum == 0, result, iterator); onFinished(subInstanceId, holder.failedNum == 0, result, iterator);
continue; continue;
// MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否 // MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否

View File

@ -146,7 +146,7 @@ public abstract class TaskTracker {
// 处理失败的情况 // 处理失败的情况
int configTaskRetryNum = instanceInfo.getTaskRetryNum(); int configTaskRetryNum = instanceInfo.getTaskRetryNum();
if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum > 1) { if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {
// 失败不是主要的情况多查一次数据库也问题不大况且前面有缓存顶着大部分情况之前不会去查DB // 失败不是主要的情况多查一次数据库也问题不大况且前面有缓存顶着大部分情况之前不会去查DB
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId); Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);