feat: Complete all testing and ready for release

This commit is contained in:
tjq 2023-08-13 22:29:31 +08:00
parent c08b4f1858
commit 15fa1abd91

View File

@ -1,9 +1,11 @@
package tech.powerjob.official.processors.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.official.processors.CommonBasicProcessor;
import tech.powerjob.official.processors.util.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
@ -27,11 +29,6 @@ import java.util.concurrent.ThreadLocalRandom;
*/
public class VerificationProcessor extends CommonBasicProcessor implements MapReduceProcessor, BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
return new ProcessResult(true, "preProcess successfully!");
}
@Override
protected ProcessResult process0(TaskContext taskContext) throws Exception {
@ -44,13 +41,25 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe
switch (mode) {
case ERROR:
return new ProcessResult(false, "EXECUTE_FAILED_DUE_TO_CONFIG");
return new ProcessResult(false, "EXECUTE_FAILED_FOR_TEST");
case EXCEPTION:
throw new PowerJobException("exception for test");
case TIMEOUT:
final Long sleepMs = Optional.ofNullable(verificationParam.getSleepMs()).orElse(3600000L);
Thread.sleep(sleepMs);
return new ProcessResult(true, "AFTER_SLEEP_" + sleepMs);
case RETRY:
int currentRetryTimes = taskContext.getCurrentRetryTimes();
int maxRetryTimes = taskContext.getMaxRetryTimes();
omsLogger.info("[Retry] currentRetryTimes: {}, maxRetryTimes: {}", currentRetryTimes, maxRetryTimes);
if (currentRetryTimes < maxRetryTimes) {
Thread.sleep(100);
omsLogger.info("[Retry] currentRetryTimes[{}] < maxRetryTimes[{}], return failed status!", currentRetryTimes, maxRetryTimes);
return new ProcessResult(false, "FAILED_UNTIL_LAST_RETRY_" + currentRetryTimes);
} else {
omsLogger.info("[Retry] last retry, return success status!");
return new ProcessResult(true, "RETRY_SUCCESSFULLY!");
}
case MR:
if (isRootTask()) {
final int batchNum = Optional.ofNullable(verificationParam.getBatchNum()).orElse(10);
@ -69,10 +78,14 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe
omsLogger.info("[VerificationProcessor] all map successfully!");
return new ProcessResult(true, "MAP_SUCCESS");
} else {
String taskId = taskContext.getTaskId();
final Double successRate = Optional.ofNullable(verificationParam.getSubTaskSuccessRate()).orElse(0.5);
final double rd = ThreadLocalRandom.current().nextDouble(0, 1);
boolean success = rd <= successRate;
return new ProcessResult(success, String.format("taskId_%s_success_%s", taskContext.getTaskId(), success));
long processCost = ThreadLocalRandom.current().nextLong(277);
Thread.sleep(processCost);
omsLogger.info("[VerificationProcessor] [MR] taskId:{}, processCost: {}, success:{}", taskId, processCost, success);
return new ProcessResult(success, RandomStringUtils.randomAlphanumeric(3));
}
}
@ -84,9 +97,47 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
return new ProcessResult(true, "REDUCE_SUCCESS");
List<String> successTaskIds = Lists.newArrayList();
List<String> failedTaskIds = Lists.newArrayList();
StringBuilder sb = new StringBuilder();
taskResults.forEach(taskResult -> {
sb.append("tId:").append(taskResult.getTaskId()).append(";")
.append("tSuc:").append(taskResult.isSuccess()).append(";")
.append("tRes:").append(taskResult.getResult());
if (taskResult.isSuccess()) {
successTaskIds.add(taskResult.getTaskId());
} else {
failedTaskIds.add(taskResult.getTaskId());
}
});
context.getOmsLogger().info("[Reduce] [summary] successTaskNum: {}, failedTaskNum: {}, successRate: {}",
successTaskIds.size(), failedTaskIds.size(), 1.0 * successTaskIds.size() / (successTaskIds.size() + failedTaskIds.size()));
context.getOmsLogger().info("[Reduce] successTaskIds: {}", successTaskIds);
context.getOmsLogger().info("[Reduce] failedTaskIds: {}", failedTaskIds);
return new ProcessResult(true, sb.toString());
}
/* ************************** 广播任务部分 ************************** */
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost());
return new ProcessResult(true, "preProcess successfully!");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost());
omsLogger.info("====== All Node's Process Result ======");
taskResults.forEach(r -> omsLogger.info("taskId:{},success:{},result:{}", r.getTaskId(), r.isSuccess(), r.getResult()));
return new ProcessResult(true, "postProcess successfully!");
}
/* ************************** 广播任务部分 ************************** */
enum Mode {
/**
* 常规模式直接返回响应
@ -95,20 +146,29 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe
BASE,
/**
* 超时sleep 一段时间测试超时控制
* {"mode":"TIMEOUT","sleepMs":3600000}
*/
TIMEOUT,
/**
* 测试执行失败响应返回 success = false
* {"mode":"ERROR"}
*/
ERROR,
/**
* 测试执行异常抛出异常
* {"mode":"EXCEPTION"}
*/
EXCEPTION,
/**
* MapReduce
* MapReduce需要控制台配置为 MapReduce 执行模式
* {"mode":"MR","batchNum": 10, "batchSize": 20,"subTaskSuccessRate":0.7}
*/
MR
MR,
/**
* 重试后成功JOB 配置 Task 重试次数
* {"mode":"EXCEPTION"}
*/
RETRY
;
public static Mode of(String v) {