diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java index 9a882dc0..4588c22e 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java @@ -1,9 +1,11 @@ package tech.powerjob.official.processors.impl; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; import lombok.*; import org.apache.commons.lang3.RandomStringUtils; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.utils.NetUtils; import tech.powerjob.official.processors.CommonBasicProcessor; import tech.powerjob.official.processors.util.CommonUtils; import tech.powerjob.worker.core.processor.ProcessResult; @@ -27,11 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; */ public class VerificationProcessor extends CommonBasicProcessor implements MapReduceProcessor, BroadcastProcessor { - @Override - public ProcessResult preProcess(TaskContext context) throws Exception { - return new ProcessResult(true, "preProcess successfully!"); - } - @Override protected ProcessResult process0(TaskContext taskContext) throws Exception { @@ -44,13 +41,25 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe switch (mode) { case ERROR: - return new ProcessResult(false, "EXECUTE_FAILED_DUE_TO_CONFIG"); + return new ProcessResult(false, "EXECUTE_FAILED_FOR_TEST"); case EXCEPTION: throw new PowerJobException("exception for test"); case TIMEOUT: final Long sleepMs = Optional.ofNullable(verificationParam.getSleepMs()).orElse(3600000L); Thread.sleep(sleepMs); return new ProcessResult(true, "AFTER_SLEEP_" + sleepMs); + case RETRY: + int currentRetryTimes = taskContext.getCurrentRetryTimes(); + int maxRetryTimes = taskContext.getMaxRetryTimes(); + omsLogger.info("[Retry] currentRetryTimes: {}, maxRetryTimes: {}", currentRetryTimes, maxRetryTimes); + if (currentRetryTimes < maxRetryTimes) { + Thread.sleep(100); + omsLogger.info("[Retry] currentRetryTimes[{}] < maxRetryTimes[{}], return failed status!", currentRetryTimes, maxRetryTimes); + return new ProcessResult(false, "FAILED_UNTIL_LAST_RETRY_" + currentRetryTimes); + } else { + omsLogger.info("[Retry] last retry, return success status!"); + return new ProcessResult(true, "RETRY_SUCCESSFULLY!"); + } case MR: if (isRootTask()) { final int batchNum = Optional.ofNullable(verificationParam.getBatchNum()).orElse(10); @@ -69,10 +78,14 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe omsLogger.info("[VerificationProcessor] all map successfully!"); return new ProcessResult(true, "MAP_SUCCESS"); } else { + String taskId = taskContext.getTaskId(); final Double successRate = Optional.ofNullable(verificationParam.getSubTaskSuccessRate()).orElse(0.5); final double rd = ThreadLocalRandom.current().nextDouble(0, 1); boolean success = rd <= successRate; - return new ProcessResult(success, String.format("taskId_%s_success_%s", taskContext.getTaskId(), success)); + long processCost = ThreadLocalRandom.current().nextLong(277); + Thread.sleep(processCost); + omsLogger.info("[VerificationProcessor] [MR] taskId:{}, processCost: {}, success:{}", taskId, processCost, success); + return new ProcessResult(success, RandomStringUtils.randomAlphanumeric(3)); } } @@ -84,9 +97,47 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe @Override public ProcessResult reduce(TaskContext context, List taskResults) { - return new ProcessResult(true, "REDUCE_SUCCESS"); + List successTaskIds = Lists.newArrayList(); + List failedTaskIds = Lists.newArrayList(); + StringBuilder sb = new StringBuilder(); + taskResults.forEach(taskResult -> { + sb.append("tId:").append(taskResult.getTaskId()).append(";") + .append("tSuc:").append(taskResult.isSuccess()).append(";") + .append("tRes:").append(taskResult.getResult()); + if (taskResult.isSuccess()) { + successTaskIds.add(taskResult.getTaskId()); + } else { + failedTaskIds.add(taskResult.getTaskId()); + } + }); + + context.getOmsLogger().info("[Reduce] [summary] successTaskNum: {}, failedTaskNum: {}, successRate: {}", + successTaskIds.size(), failedTaskIds.size(), 1.0 * successTaskIds.size() / (successTaskIds.size() + failedTaskIds.size())); + context.getOmsLogger().info("[Reduce] successTaskIds: {}", successTaskIds); + context.getOmsLogger().info("[Reduce] failedTaskIds: {}", failedTaskIds); + + return new ProcessResult(true, sb.toString()); } + /* ************************** 广播任务部分 ************************** */ + + @Override + public ProcessResult preProcess(TaskContext context) throws Exception { + context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost()); + return new ProcessResult(true, "preProcess successfully!"); + } + + @Override + public ProcessResult postProcess(TaskContext context, List taskResults) throws Exception { + OmsLogger omsLogger = context.getOmsLogger(); + omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost()); + omsLogger.info("====== All Node's Process Result ======"); + taskResults.forEach(r -> omsLogger.info("taskId:{},success:{},result:{}", r.getTaskId(), r.isSuccess(), r.getResult())); + return new ProcessResult(true, "postProcess successfully!"); + } + + /* ************************** 广播任务部分 ************************** */ + enum Mode { /** * 常规模式,直接返回响应 @@ -95,20 +146,29 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe BASE, /** * 超时,sleep 一段时间测试超时控制 + * {"mode":"TIMEOUT","sleepMs":3600000} */ TIMEOUT, /** * 测试执行失败,响应返回 success = false + * {"mode":"ERROR"} */ ERROR, /** * 测试执行异常,抛出异常 + * {"mode":"EXCEPTION"} */ EXCEPTION, /** - * MapReduce + * MapReduce,需要控制台配置为 MapReduce 执行模式 + * {"mode":"MR","batchNum": 10, "batchSize": 20,"subTaskSuccessRate":0.7} */ - MR + MR, + /** + * 重试后成功,JOB 配置 Task 重试次数 + * {"mode":"EXCEPTION"} + */ + RETRY ; public static Mode of(String v) {